Summary: The polling container implements the
IEventListenerContainer interface, and allows you to perform polling receive operations against the space.
OverviewThe polling event container implements the IEventListenerContainer interface. Its life-cycle consists of performing polling receive operations against the space. If a receive operation succeeds (a value is returned from the receive operation), the DataEventArrived event is invoked. A polling event operation is mainly used when simulating Queue semantics, or when using the master-worker design pattern. Here is a simple example of polling event container construction:
Using EventListenerContainerFactory
[PollingEventDriven] public class SimpleListener { [EventTemplate] public Data UnprocessedData { get { Data template = new Data(); template.Processed = false; return template; } } [DataEventHandler] public Data ProcessData(Data inputObject) { //process Data here and return processed data } } Constructing the polling container that uses the SimpleListener class as the event listener, and starting it. ISpaceProxy spaceProxy = // either create the SpaceProxy or obtain a reference to it IEventListenerContainer<Data> eventListenerContainer = EventListenerContainerFactory.CreateContainer<Data>(spaceProxy, new SimpleListener()); eventListenerContainer.Start(); // when needed to dispose of the container eventListenerContainer.Dispose() PollingEventListenerContainer Code Construction ISpaceProxy spaceProxy = // either create the SpaceProxy or obtain a reference to it PollingEventListenerContainer<Data> pollingEventListenerContainer = new PollingEventListenerContainer<Data>(spaceProxy); pollingEventListenerContainer.Template = new Data(false); pollingEventListenerContainer.DataEventArrived += new DelegateDataEventArrivedAdapter<Data,Data>(ProcessData).WriteBackDataEventHandler; // when needed dispose of the container pollingEventListenerContainer.Dispose(); Event processing method public Data ProcessData(IEventListenerContainer<Data> sender, DataEventArgs<Data> e) { Data data = e.Data; //process Data here and return processed data }
The example above performs single take operations (see below), using the provided template, which can be any .NET object (in this case a Data object with its processed flag set to false). If the take operation succeeds (a value is returned), the SimpleListener.ProcessData method is invoked. The operations are performed on the supplied space proxy. Primary/BackupThe polling event container performs receive operations only when the relevant space it is working against is in primary mode. When the space is in backup mode, no receive operations are performed. If the space moves from backup mode to primary mode, the receive operations are started.
FIFO GroupingThe FIFO Grouping designed to allow efficient processing of events with partial ordering constraints. Instead of maintaining a FIFO queue per class type, it lets you have a higher level of granularity by having FIFO queue maintained according to a specific value of a specific property. For more details see FIFO Grouping. Concurrent ConsumersBy default, the polling event container starts a single thread that performs the receive operations and invokes the event listener. It can be configured to start several concurrent consumer threads, and have an upper limit to the concurrent consumer threads, the container will manage the scaling up and down of concurrent consumers automatically according to the load, however, there are a few parameters regarding this scaling logic which are described in Auto Polling Consumer Scaling. This provides faster processing of events. However, any FIFO behavior that might be configured in the space and/or template is lost. Here is an example of a polling container with 3 concurrent consumers and a maximum of 5 concurrent consumers:
Using EventListenerContainerFactory
[PollingEventDriven(MinConcurrentConsumers = 3, MaxConcurrentConsumers = 5] public class SimpleListener { [EventTemplate] public Data UnprocessedData { get { Data template = new Data(); template.Processed = false; return template; } } [DataEventHandler] public Data ProcessData(Data event) { //process Data here and return processed data } } PollingEventListenerContainer Code Construction PollingEventListenerContainer<Data> pollingEventListenerContainer = // create or obtain a reference to a polling container
pollingEventListenerContainer.MinConcurrentConsumers = 3;
pollingEventListenerContainer.MaxConcurrentConsumers = 5;
Sometimes, it is very convenient to have a listener instance per concurrent polling thread. This allows a thread-safe instance variable to be constructed, without worrying about concurrent access. In such a case, the event listener containing class should implement System.ICloneable, and the CloneEventListenersPerThread property should be set to true. Here is an example:
Using EventListenerContainerFactory
[PollingEventDriven(MinConcurrentConsumers = 3, MaxConcurrentConsumers = 5, CloneEventListenersPerThread = true] public class SimpleListener : ICloneable { ... } PollingEventListenerContainer Code Construction PollingEventListenerContainer<Data> pollingEventListenerContainer = // create or obtain a reference to a polling container pollingEventListenerContainer.MinConcurrentConsumers = 3; pollingEventListenerContainer.MaxConcurrentConsumers = 5; pollingEventListenerContainer.CloneEventListenersPerThread = true; Static Template DefinitionWhen performing receive operations, a template is defined, creating a virtualized subset of data within the space that matches it. GigaSpaces supports templates, based on the actual domain model (with null values denoting wildcards), which are shown in the examples. GigaSpaces allows the use of SqlQuery in order to query the space, which can be easily used with the event container as the template. Here is an example of how it can be defined:
Using EventListenerContainerFactory
[PollingEventDriven] public class SimpleListener { [EventTemplate] public SqlQuery<Data> UnprocessedData { get { SqlQuery<Data> templateQuery = new SqlQuery<Data>("Processed = true"); return templateQuery; } } [DataEventHandler] public Data ProcessData(Data event) { //process Data here and return processed data } } PollingEventListenerContainer Code Construction PollingEventListenerContainer<Data> pollingEventListenerContainer = // create or obtain a reference to a polling container pollingEventListenerContainer.Template = new SqlQuery<Data>("Processed = false"); Dynamic Template DefinitionWhen performing polling receive operations, a dynamic template can be used. A method providing a dynamic template is called before each receive operation, and can return a different object in each call.
Using EventListenerContainerFactory
[PollingEventDriven] public class SimpleListener { [DynamicEventTemplate] public SqlQuery<Data> UnprocessedExpiredData { get { long expired = DateTime.Now.Millisecond - 60000; SqlQuery<Data> dynamicTemplate = new SqlQuery<Data>("Processed = true AND Timestamp < " + expired); return dynamicTemplate; } } [DataEventHandler] public Data EventListener(Data event) { //process Data here } } PollingEventListenerContainer Code Construction PollingEventListenerContainer<Data> pollingEventListenerContainer = // create or obtain a reference to a polling container pollingEventListenerContainer.DynamicTemplate = new ExpiredDataTemplateProvider.GetDynamicTemplate; ... public class ExpiredDataTemplateProvider { public SqlQuery<Data> GetDynamicTemplate() { long expired = DateTime.Now.Millisecond - 60000; SqlQuery<Data> dynamicTemplate = new SqlQuery<Data>("Processed = true AND Timestamp < " + expired); return dynamicTemplate; } }
Receive Operation HandlerThe polling receive container performs receive operations. The actual implementation of the receive operation is abstracted using the following interface: public interface IReceiveOperationHandler<TData> { /// <summary> /// Performs the actual receive operation. Return values allowed are single object or an array of objects. /// </summary> /// <param name="template">The template for the receive operation.</param> /// <param name="proxy">The proxy to execute the operation on.</param> /// <param name="tx">Operation's transaction context, can be null.</param> /// <param name="receiveTimeout">Operation's receive timeout</param> /// <returns>An object that is passed to the event listener, null result doesn't trigger an event.</returns> TData Receive(IQuery<TData> template, ISpaceProxy proxy, ITransaction tx, long receiveTimeout); /// <summary> /// Performs the actual receive operation. Return value is an array of data objects. /// </summary> /// <param name="template">The template for the receive operation.</param> /// <param name="proxy">The proxy to execute the operation on.</param> /// <param name="tx">Operation's transaction context, can be null.</param> /// <param name="receiveTimeout">Operation's receive timeout</param> /// <param name="batchSize">operation's batch size</param> /// <returns>An batch of objects that will be passed to the event listener, null result doesn't trigger an event.</returns> TData[] ReceiveBatch(IQuery<TData> template, ISpaceProxy proxy, ITransaction tx, long receiveTimeout, int batchSize); } XAP.NET comes with several built-in receive operation-handler implementations:
Here is an example of how the receive operation handler can be configured with ExclusiveReadReceiveOperationHandler:
Using EventListenerContainerFactory
[PollingEventDriven] public class SimpleListener { [ReceiveHandler] public IReceiveOperationHandler<Data> ReceiveHandler() { ExclusiveReadReceiveOperationHandler<Data> receiveHandler = new ExclusiveReadReceiveOperationHandler<Data>(); return receiveHandler; } [EventTemplate] public Data UnprocessedData { get { Data template = new Data(); template.Processed = false; return template; } } [DataEventHandler] public Data ProcessData(Data event) { //process Data here and return processed data } } PollingEventListenerContainer Code Construction PollingEventListenerContainer<Data> pollingEventListenerContainer = // create or obtain a reference to a polling container ExclusiveReadReceiveOperationHandler<Data> receiveHandler = new ExclusiveReadReceiveOperationHandler<Data>(); pollingEventListenerContainer.ReceiveOperationHandler = receiveHandler; Non-Blocking Receive HandlerWhen working with a partitioned cluster, and configuring the polling container to work against the whole cluster, blocking operations are not allowed (when the routing index is not set in the template). The default receive operation handlers support performing the receive operation in a non-blocking manner, by sleeping between non-blocking operations. For example, the TakeReceiveOperationHandler performs a non-blocking Take operation against the space, and then sleeps for a configurable amount of time. Here is an example of how it can be configured:
Using EventListenerContainerFactory
[PollingEventDriven] public class SimpleListener { [ReceiveHandler] public IReceiveOperationHandler<Data> ReceiveHandler() { TakeReceiveOperationHandler<Data> receiveHandler = new TakeReceiveOperationHandler<Data>(); receiveHandler.NonBlocking = true; receiveHandler.NonBlockingFactor = 10; return receiveHandler; } [EventTemplate] public Data UnprocessedData { get { Data template = new Data(); template.Processed = false; return template; } } [DataEventHandler] public Data ProcessData(Data event) { //process Data here and return processed data } } PollingEventListenerContainer Code Construction PollingEventListenerContainer<Data> pollingEventListenerContainer = // create or obtain a reference to a polling container TakeReceiveOperationHandler<Data> receiveHandler = new TakeReceiveOperationHandler<Data>(); receiveHandler.NonBlocking = true; receiveHandler.NonBlockingFactor = 10; pollingEventListenerContainer.ReceiveOperationHandler = receiveHandler; The above example uses a receive timeout of 10 seconds (10000 milliseconds). The TakeReceiveOperationHandler is configured to be non-blocking, with a non-blocking factor of 10. This means that the receive handler performs 10 non-blocking takes within 10 seconds, and sleeps the rest of the time (~1 second each time). Batch EventsSometimes it is better to use batch events, for instance to improve network traffic. This is done by subscribing to the BatchDataEventArrived event. This event receives a batch of event data objects in one invocation. A prime example is the TakeReceiveOperationHandler, which when BatchDataEventArrived event are used, returns an array as a result of a TakeMultiple operation . The batch size is determined by the ReceiveBatchSize configuration attribute or property, it is set similiar to the above properties modifications. Here is an example of batch notifications using ReadReceiveOperationHandler:
Using EventListenerContainerFactory
[PollingEventDriven(ReceiveBatchSize = 100)] public class SimpleListener { [ReceiveHandler] public IReceiveOperationHandler<Data> ReceiveHandler() { ReadReceiveOperationHandler<Data> receiveHandler = new ReadReceiveOperationHandler<Data>(); return receiveHandler; } [EventTemplate] public Data UnprocessedData { get { Data template = new Data(); template.Processed = false; return template; } } [DataEventHandler] public Data[] ProcessData(Data[] event) { //process batch Data here and return processed data } } PollingEventListenerContainer Code Construction PollingEventListenerContainer<Data> pollingEventListenerContainer = // create or obtain a reference to a polling container ReadReceiveOperationHandler<Data> receiveHandler = new ReadReceiveOperationHandler<Data>(); pollingEventListenerContainer.ReceiveOperationHandler = receiveHandler; pollingEventListenerContainer.ReceiveBatchSize = 100; pollingEventListenerContainer.Template = new Data(false); pollingEventListenerContainer.BatchDataEventArrived += new DelegateDataEventArrivedAdapter<Data,Data[]>(ProcessData).WriteBackBatchDataEventHandler; // when needed dispose of the container pollingEventListenerContainer.Dispose(); Transaction SupportBoth the receive operation, and the actual event action can be configured to be performed under a transaction. Transaction support is required for example, when an exception occurs in the event listener, and the receive operation needs to be to rolled back (and the actual data event is returned to the space). Adding transaction support to the polling container is very simple. It is done by setting the TransactionType property. There are two transaction types: Distributed and Manual.
Using EventListenerContainerFactory
[PollingEventDriven]
[TransactionalEvent(TransactionType = TransactionType.Distributed)]
public class SimpleListener
{
...
}
PollingEventListenerContainer Code Construction ISpaceProxy spaceProxy = // either create the SpaceProxy or obtain a reference to it PollingEventListenerContainer<Data> pollingEventListenerContainer = new PollingEventListenerContainer<Data>(spaceProxy); pollingEventListenerContainer.TransactionType = TransactionType.Distributed; When using transactions with the polling container, special care should be taken with timeout values. Transactions started by the polling container can have a timeout value associated with them (if this is not set, it defaults to the default timeout value of the transaction manager). If setting a specific timeout value, make sure the timeout value is higher than the timeout value for blocking operations, and includes the expected execution time of the associated listener. Here is an example how timeout value can be set with the polling container:
Using EventListenerContainerFactory
[PollingEventDriven]
[TransactionalEvent(TransactionType = TransactionType.Distributed, TransactionTimeout = 1000)]
public class SimpleListener
{
...
}
PollingEventListenerContainer Code Construction PollingEventListenerContainer<Data> pollingEventListenerContainer = // create or obtain a reference to a polling container
pollingEventListenerContainer.TransactionTimeout = 1000;
It is possible to receive a reference to the on going transaction as part of the event handling method, if for instance, the event handling is doing some extra space operations which should be executed under the same transaction context and rolled back upon failure. This is done be adding a transaction parameter to the event handler method. For example:
Using EventListenerContainerFactory
[PollingEventDriven] [TransactionalEvent(TransactionType = TransactionType.Distributed)] public class SimpleListener { ... [DataEventHandler] public Data ProcessData(Data event, ISpaceProxy spaceProxy, ITransaction transaction) { //process Data here and return processed data } } PollingEventListenerContainer Code Construction ISpaceProxy spaceProxy = // either create the SpaceProxy or obtain a reference to it PollingEventListenerContainer<Data> pollingEventListenerContainer = new PollingEventListenerContainer<Data>(spaceProxy); pollingEventListenerContainer.TransactionType = TransactionType.Distributed; pollingEventListenerContainer.DataEventArrived += new DelegateDataEventArrivedAdapter<Data,Data>(ProcessData).WriteBackDataEventHandler;
Trigger Receive OperationWhen configuring the polling event container to perform its receive operation, and event actions under a transaction, a transaction is started and commited for each unsuccessful receive operation, which results in a higher load on the space. The polling event container allows pluggable logic to be used in order to decide if the actual receive operation should be performed or not. This logic, called the trigger receive operation, is performed outside the receive transaction boundaries. The following interface is provided for custom implementation of this logic: public interface ITriggerOperationHandler<TData> { /// <summary> /// Allows you to perform a trigger receive operation, which controls if the active receive operation /// is performed in a polling event container. This feature is mainly used when having polling event /// operations with transactions where the trigger receive operation is performed outside of a /// transaction thus reducing the creation of transactions did not perform the actual receive /// operation. /// /// If this method returns a non null value, it means /// that the receive operation should take place. If it returns a null value, no receive operation is /// attempted, thus no transaction is created. /// </summary> /// <param name="template">The template for the receive operation.</param> /// <param name="proxy">The proxy to execute the operation on.</param> /// <param name="receiveTimeout">Operation's receive timeout</param> /// <returns>Null value when the receive operation should not be triggered, otherwise a non null value that can also /// be used as the receive template if <see cref="UseTriggerAsTemplate"/> is set to true.</returns> IQuery<TData> TriggerReceive(IQuery<TData> template, ISpaceProxy proxy, long receiveTimeout); /// <summary> /// Gets if the object that return from the <see cref="TriggerReceive"/> operation should be used as the /// receive template instead of the configured template. /// </summary> bool UseTriggerAsTemplate { get; } } XAP.NET comes with a built-in implementation of this interface, called ReadTriggerOperationHandler. It performs a single blocking Read operation (using the provided receive timeout), thus "peeking" into the space for relevant event data. If the Read operation returns a value, this means that there is a higher probability that the receive operation will succeed, and the transaction won't be started without a purpose. Here is how it can be configured:
Using EventListenerContainerFactory
[PollingEventDriven] public class SimpleListener { [TriggerHandler] public ITriggerOperationHandler<Data> TriggerHandler() { ReadTriggerOperationHandler<Data> triggerHandler = new ReadTriggerOperationHandler<Data>(); return triggerHandler; } [EventTemplate] public Data UnprocessedData { get { Data template = new Data(); template.Processed = false; return template; } } [DataEventHandler] public Data ProcessData(Data event) { //process Data here and return processed data } } PollingEventListenerContainer Code Construction PollingEventListenerContainer<Data> pollingEventListenerContainer = // create or obtain a reference to a polling container ReadTriggerOperationHandler<Data> triggerHandler = new ReadTriggerOperationHandler<Data>(); pollingEventListenerContainer.ReceiveOperationHandler = receiveHandler; Non-Blocking Trigger HandlerThe ReadTriggerOperationHandler can be set to be non-blocking, in the same way as described in Non-Blocking Receive Handler. Handling ExceptionsDuring the life-cycle of the polling container, two types of exceptions might be thrown:
The User Exception is an exception that occurs during the invocation of the user event listener. The Container Exception is an exception that occurs anywhere else during the life-cycle of the container (e.g. during the receive or trigger operation handler). Subscribing to Container Exception Occured EventIt is possible to be notified when a container exception occured, by subscribing to the ContainerExceptionOccured event, and get a reference to the exception. Here is an example of how to subscribe to this event:
Using EventListenerContainerFactory
[PollingEventDriven] public class SimpleListener { [ContainerException] public void ExceptionHandler(IEventListenerContainer<Data> sender, ContainerExceptionEventArgs e) { Console.WriteLine("Container Name: " + ((IEventListenerContainer<Data>)sender).Name); Console.WriteLine(e.Exception.Message); } ... } PollingEventListenerContainer Code Construction PollingEventListenerContainer<Data> pollingEventListenerContainer = // create or obtain a reference to a polling container
pollingEventListenerContainer.ContainerExceptionOccured += ExceptionHandler;
public void ExceptionHandler(object sender, ContainerExceptionEventArgs e) { Console.WriteLine("Container Name: " + ((IEventListenerContainer<Data>)sender).Name); Console.WriteLine(e.Exception.Message); } Subscribing to User Exception Occured EventIt is possible to be notified when a user exception occured, by subscribing to the UserExceptionOccured event. This arguments of this event contain the entire DataEventArgs of the original DataEventArrived. By default, any event that is thrown inside the event listener scope results in a transaction rollback, if the container is set to be transactional. This can be overriden if the user exception handler sets the event state to: ignored. Here is an example of how to subscribe to this event:
Using EventListenerContainerFactory
[PollingEventDriven] public class SimpleListener { [UserException] public void ExceptionHandler(IEventListenerContainer<Data> sender, UserExceptionEventArgs<Data> e) { if (e.Exception is MySpecialException) e.Ignore = true; } ... } PollingEventListenerContainer Code Construction PollingEventListenerContainer<Data> pollingEventListenerContainer = // create or obtain a reference to a polling container
pollingEventListenerContainer.UserExceptionOccured += ExceptionHandler;
public void ExceptionHandler(object sender, UserExceptionEventArgs<Data> e) { if (e.Exception is MySpecialException) e.Ignore = true; } Default Values of Polling Container Configuration ParametersThe default values for all of the polling container properties such as min-concurrent-consumers, receive-operation-handler, receive-timeout and others can be found in the API docs. Each property has a corresponding Default<property name> const field that sets the default value of the property. |
![]() |
GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence |