Summary: The space includes a special mechanism that detects clients that cannot consume the notifications sent fast enough - i.e. slow consumers.
Handling Slow ConsumersTraditionally, messaging-oriented middleware products have several methods for handling slow consumers:
All of the options can be very costly in terms of performance and reliability. The options that guarantee reliability and consistency are blocking or slowing down the producer and spooling messages to disk and replaying at a later time. Both have a very high price in terms of performance and scalability, that doesn't just affect the slow consumers, but also the entire cluster. Spooling messages to disk and replaying at a later time is considered more popular due to the fact that it effects the performance of the non-slow consumers indirectly, whereas when blocking or slowing down the producer the effect is direct and costly. These are the ways slow consumers are handled in the messaging world. With GigaSpaces In-Memory-Data-Grid, the options of spooling messages to disk and replaying at a later time, and discarding messages for the slow consumer might not be relevant, since in most cases the need is to maintain a consistent local image of a data set at the client side. Updates for this data set are sent into the client via notifications. This local view should be accessible to the client when there is a connection failure or a slow consumer running. A different approach that cannot be implemented with messaging-oriented middleware is to get a snapshot of the current state of the data set the client is interested in from the server, and later continue to receive updates from that point onward. Later is not a feasible option with traditional messaging systems, since the notion of 'current-state' does not exists. The space on the other hand, maintains the current state inevitably and therefore it makes much more sense to handle slow consumer this way - i.e., when a client is detected as slow consumer, it is disconnected from the space server. The client then reconnects to the server, initializes its status by reading the relevant data set from the space, and later continues to receive notifications.
How it WorksWhen sending notifications to clients, the space includes a special mechanism that detects clients that cannot consume the notification sent fast enough - i.e. a slow consumer. Slow consumers can cause problems, since they force the space to keep the notification in memory till all consumers consumed the event. Once the space process memory is full, the space is forced to slow down the response to clients, causing fast consumers to be slowed down. In extreme cases, this behavior might eventually cause the space process to exit abnormally with an out-of-memory error. Once the space detects a slow consumer, it automatically disconnects the slow consumer by canceling its notification registration, enforcing it to reconnect (re-register for notifications). This ensures that fast consumers are not affected by the slow consumer behavior. Slow consumers are determined by measuring the notification delivery throughput to a client - see the throughput parameter - measured in bytes. Once the space detects a client that consumes notifications below the defined throughput limit, it waits (maximum time in ms) for it to recover - see the latency parameter. The space tries to determine several times (see the retries parameter) during the wait period if the client recovered - if it is still below the defined throughput, its notify registration is canceled. To allow the client to detect that the space removed its notify registration, it should register the notification using a short lease and renew it manually or automatically using the LeaseRenewalManager. If the renew failed, this means that the space canceled the notification. In this case, the client should re-register for notify registration. The threadsQueueSize option in the services.config file (see the configuration section below), measured in Entries, configures the client and space server communication queue maximum size when processing incoming requests. Please note this is server and client parameter once the client is receiving notifications. When the threadsQueueSize in the client side reached its limit (client can't consume incoming notifications), the client will stop consuming incoming network packets. This in return will initiate the slow consumer mechanism at the space side that will cancel the client notify registrations.
The com.gs.fifo_notify.queue configure the client FIFO queue maximum size used with FIFO-based notifications. Once this and the threadsQueueSize are reached, the space marks the relevant client as a slow consumer candidate. Configuration - Server SideTo enable the slow consumer mechanism, you need to replace the following NIOConfiguration with the existing NIOConfiguration block in your services.config file: com.gigaspaces.transport { ... nioConfig = new com.gigaspaces.config.lrmi.nio.NIOConfiguration( 16, /* min executors threads */ 500, /* max executors threads */ 1024, /* maxConnPool */ bindHost, /* bindHostName - if null resolves to the localhost IP address */ "0", /* bindPort - if 0 any next free port will be used for incoming client requests. The server port is set by default to 0, which means next free port*/ java.lang.Integer.MAX_VALUE, /* threadsQueueSize */ 0, /* slowConsumerThroughput */ 100, /* slowConsumerLatency */ 3 /* slowConsumerRetries */); ... } Configuration - Client SideThe following JVM system property determines the FIFO notify queue size - Default=Integer.MAX_VALUE: -Dcom.gs.fifo_notify.queue=20000 ExampleThe following example illustrates how to renew the notify registration and re-register for notifications once the client is identified as a slow consumer - i.e. its notification is canceled. The example:
Entry Classpackage com.j_spaces.examples.slowconsumer; import net.jini.core.entry.Entry; public class Message implements Entry { public String content; public Message() { } } Program Classpackage com.j_spaces.examples.slowconsumer; import java.rmi.RemoteException; import java.util.Date; import net.jini.core.event.RemoteEvent; import net.jini.core.event.RemoteEventListener; import net.jini.core.event.UnknownEventException; import net.jini.core.lease.Lease; import net.jini.core.transaction.TransactionException; import net.jini.lease.LeaseListener; import net.jini.lease.LeaseRenewalManager; import net.jini.config.ConfigurationException; import net.jini.config.ConfigurationProvider; import com.j_spaces.core.IJSpace; import com.j_spaces.core.client.EntryArrivedRemoteEvent; import com.j_spaces.core.client.NotifyDelegator; import com.j_spaces.core.client.NotifyModifiers; import com.j_spaces.core.client.SpaceFinder; public class NotifyLeaseExample implements LeaseListener, RemoteEventListener { IJSpace space; NotifyDelegator delegate; Lease l; LeaseRenewalManager lrm ; int counter = 0; public static void main(String[] args) { System.out.println("Welcome to Notify Registration Lease Renewal Example!"); System.out.println("This example will:\n" + "- register for notifications\n" + "- renew registration lease using the LeaseRenewalManager \n" + "- deliberately cancel notify registration - This will trigger LeaseRenewalEvent\n" + "- re-register for notifications"); System.out.println("\nThe same mechanism can be used to identify cancellation of the lease when space identified the client as slow consumer."); if (args.length != 1) { System.out.println("Usage: <URL>"); System.out.println("jini://lookup host/container name/JavaSpaces"); System.exit(1); } try { IJSpace space = (IJSpace) SpaceFinder.find(args[0]); if (space == null) { System.out.println("Space not found: " + args[0]); System.exit(-1); } NotifyLeaseExample leaseExample = new NotifyLeaseExample(space); Thread.sleep(1000000); } catch (Exception e) { e.printStackTrace(); } } private String getTime() { return new Date(System.currentTimeMillis()).toString(); } /** * Register for notification receiving the Lease object from * the NotifyDelegator - creates a LeaseRenewalManager Passes the LRM the * lease and tells it to automatically renew the lease for 10 seconds until * a certain cutoff time * * Uses "this" object as the LeaseListener since it's implementing the * LeaseListener interface. * * When the program is run, Entry is written into the space every 3 seconds, * the program then waits for about 30 seconds before terminating. during * this time, the LeaseRenewalManager renews the lease. * * @throws ConfigurationException * */ private void registerForNotifications() throws RemoteException, TransactionException, ConfigurationException { long tenSeconds = 1000; delegate = new NotifyDelegator(space, new Message(), null, this, tenSeconds * 12, null, false, NotifyModifiers.NOTIFY_WRITE); l = delegate.getEventRegistration().getLease(); System.out.println(getTime() + " -->>>>> Register for Notification OK!"); System.out.println(getTime() + " -->>>>> Notify Registration Lease originally set to expire in 12 seconds. . ."); // Lease is renewed every 5 seconds for 5 seconds lrm.renewUntil(l, System.currentTimeMillis() + 1000000, 5000, this); System.out.println(getTime() + " -->>>>> RenewalManager will renew lease for 5 seconds from now"); } public NotifyLeaseExample(IJSpace space) throws Exception { this.space = space; // the higher the first argument the more cpu the LRM uses: // 5, 20000 appears to result in success: net.jini.config.Configuration conf = getLRMConfiguration(500, 2); lrm = new LeaseRenewalManager(conf); registerForNotifications(); Message m = new Message(); // getting the notify regisration lease // Write entry to the space every 5 seconds. At the 5th time - shown // down the space. // This will trigger LeaseRenewalEvent for (int x = 1; x < 20; x++) { Thread.sleep(5000); m.content = "entry" + (counter++); space.write(m, null, Lease.FOREVER); System.out.println(getTime() + " -->>>>> " + x * 5 + " Seconds have gone by and spaceCount = " + space.count(null, null)); if (x % 5 == 0) { System.out.println(getTime() + " -->>>>> Deliberately Canceling Notify registration ..."); l.cancel(); System.out.println(getTime() + " -->>>>> Notify registration Canceled OK!"); Thread.sleep(100); } } Thread.sleep(1000); System.out.println("End Demo!"); System.exit(0); } // this method should only be called when a failure event occurs public void notify(net.jini.lease.LeaseRenewalEvent event) { System.out .println(getTime() + " -->>>>> LeaseRenewalEvent: Can't renew Lease - Lease canceled - Source:" + event.getSource()); System.out.println(getTime() + " -->>>>> Re-Register for Notification..."); try { registerForNotifications(); } catch (Exception e) { e.printStackTrace(); } } /** * Jini configuration implementation for use with Lease RenewalManagers This * example allows you to specify in the constructor the granularity of the * LeaseRenewalManager's internal timing as well as the robustness in terms * of number of retries it attempts before declaring a particular lease * unreachable. */ public static net.jini.config.Configuration getLRMConfiguration(long renewalRTT,long renewalBatchTW) throws ConfigurationException { return ConfigurationProvider.getInstance( new String[] {"-", "net.jini.lease.LeaseRenewalManager.roundTripTime="+renewalRTT, "net.jini.lease.LeaseRenewalManager.renewBatchTimeWindow ="+ renewalBatchTW}); } // end of method getLRMConfiguration public void notify(RemoteEvent event) throws UnknownEventException, RemoteException { try { /* * code to get the entry from the space using the * EntryArrivedRemoteEvent */ EntryArrivedRemoteEvent eare = (EntryArrivedRemoteEvent) event; Message m = (Message) eare.getEntry(); System.out.println(getTime() + " -->>>>> Received Notification:message received was: " + m.content); } catch (Exception e) { e.printStackTrace(); } } } Output
Welcome to Notify Registration Lease Example! This example will: - register for notification - renew registration lease using the LeaseRenewalManager - Deliberately Cancel Notify registration - This will trigger LeaseRenewalEvent- Re-Register for notification The same mechanism can be used to identify cancellation of the lease when space identified the client as slow consumer. CONFIG: Sets the system property ${com.gs.home} with value: E:\GigaSpacesXAP6.0\ Thu Apr 26 06:20:56 EDT 2007 -->>>>> Register for Notification OK! Thu Apr 26 06:20:56 EDT 2007 -->>>>> Notify Registration Lease originally set to expire in 12 seconds. . . Thu Apr 26 06:20:56 EDT 2007 -->>>>> RenewalManager will renew lease for 5 seconds from now Thu Apr 26 06:21:01 EDT 2007 -->>>>> Received Notification:message received was: entry0 Thu Apr 26 06:21:01 EDT 2007 -->>>>> 5 Seconds have gone by and spaceCount = 53 Thu Apr 26 06:21:06 EDT 2007 -->>>>> Received Notification:message received was: entry1 Thu Apr 26 06:21:06 EDT 2007 -->>>>> 10 Seconds have gone by and spaceCount = 54 Thu Apr 26 06:21:11 EDT 2007 -->>>>> 15 Seconds have gone by and spaceCount = 55 Thu Apr 26 06:21:11 EDT 2007 -->>>>> Received Notification:message received was: entry2 Thu Apr 26 06:21:16 EDT 2007 -->>>>> Received Notification:message received was: entry3 Thu Apr 26 06:21:16 EDT 2007 -->>>>> 20 Seconds have gone by and spaceCount = 56 Thu Apr 26 06:21:21 EDT 2007 -->>>>> Received Notification:message received was: entry4 Thu Apr 26 06:21:21 EDT 2007 -->>>>> 25 Seconds have gone by and spaceCount = 57 Thu Apr 26 06:21:21 EDT 2007 -->>>>> Deliberately Canceling Notify registration ... Thu Apr 26 06:21:21 EDT 2007 -->>>>> Notify registration canceled OK! Thu Apr 26 06:21:24 EDT 2007 -->>>>> LeaseRenewalEvent: Can't renew Lease - Lease canceled - Source:net.jini.lease.LeaseRenewalManager@ea443f Thu Apr 26 06:21:24 EDT 2007 -->>>>> Re-Register for Notification... Thu Apr 26 06:21:24 EDT 2007 -->>>>> Register for Notification OK! Thu Apr 26 06:21:24 EDT 2007 -->>>>> Notify Registration Lease originally set to expire in 12 seconds. . . Thu Apr 26 06:21:24 EDT 2007 -->>>>> RenewalManager will renew lease for 5 seconds from now Thu Apr 26 06:21:26 EDT 2007 -->>>>> Received Notification:message received was: entry5 Thu Apr 26 06:21:26 EDT 2007 -->>>>> 30 Seconds have gone by and spaceCount = 58 Thu Apr 26 06:21:31 EDT 2007 -->>>>> Received Notification:message received was: entry6 Thu Apr 26 06:21:31 EDT 2007 -->>>>> 35 Seconds have gone by and spaceCount = 59 Thu Apr 26 06:21:36 EDT 2007 -->>>>> Received Notification:message received was: entry7 Thu Apr 26 06:21:36 EDT 2007 -->>>>> 40 Seconds have gone by and spaceCount = 60 Thu Apr 26 06:21:41 EDT 2007 -->>>>> Received Notification:message received was: entry8 Thu Apr 26 06:21:41 EDT 2007 -->>>>> 45 Seconds have gone by and spaceCount = 61 Thu Apr 26 06:21:46 EDT 2007 -->>>>> Received Notification:message received was: entry9 Thu Apr 26 06:21:46 EDT 2007 -->>>>> 50 Seconds have gone by and spaceCount = 62 Thu Apr 26 06:21:46 EDT 2007 -->>>>> Deliberately Canceling Notify registration ... Thu Apr 26 06:21:46 EDT 2007 -->>>>> Notify registration canceled OK! Thu Apr 26 06:21:48 EDT 2007 -->>>>> LeaseRenewalEvent: Can't renew Lease - Lease canceled - Source:net.jini.lease.LeaseRenewalManager@ea443f Thu Apr 26 06:21:48 EDT 2007 -->>>>> Re-Register for Notification... Thu Apr 26 06:21:48 EDT 2007 -->>>>> Register for Notification OK! Thu Apr 26 06:21:48 EDT 2007 -->>>>> Notify Registration Lease originally set to expire in 12 seconds. . . Thu Apr 26 06:21:48 EDT 2007 -->>>>> RenewalManager will renew lease for 5 seconds from now Thu Apr 26 06:21:52 EDT 2007 -->>>>> Received Notification:message received was: entry10 Thu Apr 26 06:21:52 EDT 2007 -->>>>> 55 Seconds have gone by and spaceCount = 63 Thu Apr 26 06:21:57 EDT 2007 -->>>>> Received Notification:message received was: entry11 Thu Apr 26 06:21:57 EDT 2007 -->>>>> 60 Seconds have gone by and spaceCount = 64 Thu Apr 26 06:22:02 EDT 2007 -->>>>> Received Notification:message received was: entry12 Thu Apr 26 06:22:02 EDT 2007 -->>>>> 65 Seconds have gone by and spaceCount = 65 Thu Apr 26 06:22:07 EDT 2007 -->>>>> Received Notification:message received was: entry13 Thu Apr 26 06:22:07 EDT 2007 -->>>>> 70 Seconds have gone by and spaceCount = 66 Thu Apr 26 06:22:12 EDT 2007 -->>>>> Received Notification:message received was: entry14 Thu Apr 26 06:22:12 EDT 2007 -->>>>> 75 Seconds have gone by and spaceCount = 67 Thu Apr 26 06:22:12 EDT 2007 -->>>>> Deliberately Canceling Notify registration ... Thu Apr 26 06:22:12 EDT 2007 -->>>>> Notify registration canceled OK! Thu Apr 26 06:22:12 EDT 2007 -->>>>> LeaseRenewalEvent: Can't renew Lease - Lease canceled - Source:net.jini.lease.LeaseRenewalManager@ea443f Thu Apr 26 06:22:12 EDT 2007 -->>>>> Re-Register for Notification... Thu Apr 26 06:22:12 EDT 2007 -->>>>> Register for Notification OK! Thu Apr 26 06:22:12 EDT 2007 -->>>>> Notify Registration Lease originally set to expire in 12 seconds. . . Thu Apr 26 06:22:12 EDT 2007 -->>>>> RenewalManager will renew lease for 5 seconds from now Thu Apr 26 06:22:17 EDT 2007 -->>>>> Received Notification:message received was: entry15 Thu Apr 26 06:22:17 EDT 2007 -->>>>> 80 Seconds have gone by and spaceCount = 68 Thu Apr 26 06:22:22 EDT 2007 -->>>>> Received Notification:message received was: entry16 Thu Apr 26 06:22:22 EDT 2007 -->>>>> 85 Seconds have gone by and spaceCount = 69 Thu Apr 26 06:22:27 EDT 2007 -->>>>> Received Notification:message received was: entry17 Thu Apr 26 06:22:27 EDT 2007 -->>>>> 90 Seconds have gone by and spaceCount = 70 Thu Apr 26 06:22:32 EDT 2007 -->>>>> Received Notification:message received was: entry18 Thu Apr 26 06:22:32 EDT 2007 -->>>>> 95 Seconds have gone by and spaceCount = 71 End Demo! ***Link required |
![]() |
GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence |