Summary: Allows you to perform polling receive operations against the space.
Overview
Life Cycle EventsThe polling container life cycle events described below. You may implement each of of these to perform the desired activity.
ConfigurationHere is a simple example of polling event container configuration:
Annotation
<!-- Enable scan for OpenSpaces and Spring components --> <context:component-scan base-package="com.mycompany"/> <!-- Enable support for @Polling annotation --> <os-events:annotation-support /> <os-core:space id="space" url="/./space" /> <os-core:giga-space id="gigaSpace" space="space"/> @EventDriven @Polling public class SimpleListener { @EventTemplate Data unprocessedData() { Data template = new Data(); template.setProcessed(false); return template; } @SpaceDataEvent public Data eventListener(Data event) { //process Data here } } Namespace <os-core:space id="space" url="/./space" /> <os-core:giga-space id="gigaSpace" space="space"/> <bean id="simpleListener" class="SimpleListener" /> <os-events:polling-container id="eventContainer" giga-space="gigaSpace"> <os-core:template> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </os-core:template> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="simpleListener"/> </os-events:annotation-adapter> </os-events:listener> </os-events:polling-container> Plain XML <bean id="space" class="org.openspaces.core.space.UrlSpaceFactoryBean"> <property name="url" value="/./space" /> </bean> <bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean"> <property name="space" ref="space" /> </bean> <bean id="simpleListener" class="SimpleListener" /> <bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer"> <property name="gigaSpace" ref="gigaSpace" /> <property name="template"> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </property> <property name="eventListener"> <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter"> <property name="delegate" ref="simpleListener" /> </bean> </property> </bean> Code GigaSpace gigaSpace = // either create the GigaSpace or get it by injection SimplePollingEventListenerContainer pollingEventListenerContainer = new SimplePollingContainerConfigurer(gigaSpace) .template(new Data(false)) .eventListenerAnnotation(new Object() { @SpaceDataEvent public void eventHappened() { eventCalled.set(true); } }).pollingContainer(); // when needed dispose of the notification container pollingEventListenerContainer.destroy();
The example above performs single take operations (see below) using the provided template (a Data object with its processed flag set to false). If the take operation succeeds (a value is returned), the SimpleListener is invoked. The operations are performed on the configured GigaSpace bean (in this case, if working in a clustered topology, it is performed directly on the cluster member). Primary/BackupThe polling event container, by default, performs receive operations only when the relevant space it is working against is in primary mode. When the space is in backup mode, no receive operations are performed. If the space moves from backup mode to primary mode, the receive operations are started.
FIFO GroupingThe FIFO Grouping designed to allow efficient processing of events with partial ordering constraints. Instead of maintaining a FIFO queue per class type, it lets you have a higher level of granularity by having FIFO queue maintained according to a specific value of a specific property. For more details see FIFO Grouping. Concurrent ConsumersBy default, the polling event container starts a single thread that performs the receive operations and invokes the event listener. It can be configured to start several concurrent consumer threads and have an upper limit to the concurrent consumer threads. This provides faster processing of events, however, any FIFO behavior that might be configured in the space and/or template is lost. In order to receive events using multiple consumer threads, in the same order they are written to the Space:
Here is an example of a polling container with 3 concurrent consumers and 5 maximum concurrent consumers:
Annotation
@EventDriven @Polling(concurrentConsumers = 3, maxConcurrentConsumers = 5) public class SimpleListener { @EventTemplate Data unprocessedData() { Data template = new Data(); template.setProcessed(false); return template; } @SpaceDataEvent public Data eventListener(Data event) { //process Data here } } Namespace <os-events:polling-container id="eventContainer" giga-space="gigaSpace" concurrent-consumers="3" max-concurrent-consumers="5"> <!-- ... --> </os-events:polling-container> Plain XML <bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer"> <property name="concurrentConsumers" value="3" /> <property name="maxConcurrentConsumers" value="5" /> <!-- ... --> </bean> Sometimes, it is very convenient to have a listener instance per concurrent polling thread. This allows a thread-safe instance variable to be constructed without worrying about concurrent access. In such a case, the prototype Spring scope can be used in conjunction with a listenerRef bean name injection. Here is an example:
Namespace
<bean id="listener" class="eg.SimpleListener" scope="prototype" /> <os-events:annotation-adapter id="adapterListener" scope="prototype"> <os-events:delegate ref="listener"/> </os-events:annotation-adapter> <os-events:polling-container id="eventContainer" giga-space="gigaSpace" concurrent-consumers="2"> <!-- ... --> <os-events:listener ref="adapterListener" /> </os-events:polling-container> Plain XML <bean id="listener" class="eg.SimpleListener" scope="prototype" /> <bean id="adapterListener" class="org.openspaces.events.adapter.AnnotationEventListenerAdapter" scope="prototype"> <property name="delegate" ref="listener" /> </bean> <bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer"> <property name="concurrentConsumers" value="2" /> <property name="eventListenerRef" value="adapterListener" /> <!-- ... --> </bean> Static Template DefinitionWhen performing receive operations, a template is defined, creating a virtualized subset of data within the space that matches it. GigaSpaces supports templates based on the actual domain model (with null values denoting wildcards), which are shown in the examples. GigaSpaces allows the use of SQLQuery in order to query the space, which can be easily used with the event container as the template. Here is an example of how it can be defined:
Annotation
@EventDriven @Polling public class SimpleListener { @EventTemplate SQLQuery<Data> unprocessedData() { SQLQuery<Data> template = new SQLQuery<Data>(Data.class, "processed = true"); return template; } @SpaceDataEvent public Data eventListener(Data event) { //process Data here } } Namespace <os-events:polling-container id="eventContainer" giga-space="gigaSpace"> <os-core:sql-query where="processed = true" class="org.openspaces.example.data.common.Data"/> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="simpleListener"/> </os-events:annotation-adapter> </os-events:listener> </os-events:polling-container> Plain XML <bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer"> <property name="gigaSpace" ref="gigaSpace" /> <property name="template"> <bean class="com.j_spaces.core.client.SQLQuery"> <constructor index="0" value="org.openspaces.example.data.common.Data" /> <constructor index="0" value="processed = true" /> </bean> </property> <property name="eventListener"> <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter"> <property name="delegate" ref="simpleListener" /> </bean> </property> </bean>
Multiple Values TemplateYou may use a SQLQuery having IN operator with multiple values to register a Template with multiple values. This can be a simple alternative avoiding using multiple polling containers. See below example:
The Space Class
import com.gigaspaces.annotation.pojo.SpaceId; import com.gigaspaces.annotation.pojo.SpaceIndex; public class MyData { String id; String key; @SpaceId(autoGenerate=false) public String getId() { return id; } public void setId(String id) { this.id = id; } @SpaceIndex public String getKey() { return key; } public void setKey(String key) { this.key = key; } @Override public String toString() { return "MyData [id=" + id + ", key=" + key + "]"; } } The Template registration:
The Template registration
SimplePollingEventListenerContainer pollingEventListenerContainer = new SimplePollingContainerConfigurer(space) .template(query) .eventListenerAnnotation(new Object() { @SpaceDataEvent public void eventHappened(MyData data) { System.out.println("Polling Container Got matching event! - " +data); } }).pollingContainer(); Dynamic Template DefinitionWhen performing polling receive operations, a dynamic template can be used. A method providing a dynamic template is called before each receive operation, and can return a different object in each call.
Annotation
@EventDriven @Polling public class SimpleListener { @DynamicEventTemplate SQLQuery<Data> unprocessedExpiredData() { long expired = System.currentTimeMillis() - 60000; SQLQuery<Data> dynamicTemplate = new SQLQuery<Data>(Data.class, "processed = true AND timestamp < " + expired); return dynamicTemplate; } @SpaceDataEvent public Data eventListener(Data event) { //process Data here } } Namespace <os-events:polling-container id="eventContainer" giga-space="gigaSpace"> <os-events:dynamic-template ref="dynamicTemplate" /> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="simpleListener"/> </os-events:annotation-adapter> </os-events:listener> </os-events:polling-container> <bean id="dynamicTemplate" class="ExpiredDataTemplateProvider"/> public class ExpiredDataTemplateProvider implements DynamicEventTemplateProvider { @Override public Object getDynamicTemplate() { long expired = System.currentTimeMillis() - 60000; SQLQuery<Data> dynamicTemplate = new SQLQuery<Data>(Data.class, "processed = true AND timestamp < " + expired); return dynamicTemplate; } } Plain XML <bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer"> <property name="gigaSpace" ref="gigaSpace" /> <property name="dynamicTemplate" ref="dynamicTemplate" /> <property name="eventListener"> <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter"> <property name="delegate" ref="simpleListener" /> </bean> </property> </bean> <bean id="dynamicTemplate" class="ExpiredDataTemplateProvider"/> public class ExpiredDataTemplateProvider implements DynamicEventTemplateProvider { @Override public Object getDynamicTemplate() { long expired = System.currentTimeMillis() - 60000; SQLQuery<Data> dynamicTemplate = new SQLQuery<Data>(Data.class, "processed = true AND timestamp < " + expired); return dynamicTemplate; } }
Receive Operation HandlerThe polling receive container performs receive operations. The actual implementation of the receive operation is abstracted using the following interface: public interface ReceiveOperationHandler { /** * Performs the actual receive operation. Return values allowed are single object or an array of * objects. * * @param template * The template to use for the receive operation. * @param gigaSpace * The GigaSpace interface to perform the receive operations with * @param receiveTimeout * Receive timeout value * @return The receive result. <code>null</code> indicating no receive occured. Single object * or an array of objects indicating the receive operation result. * @throws DataAccessException */ Object receive(Object template, GigaSpace gigaSpace, long receiveTimeout) throws DataAccessException; } OpenSpaces comes with several built-in receive operation-handler implementations:
Here is an example of how the receive operation handler can be configured with MultiTakeReceiveOperationHandler:
Annotation
@EventDriven @Polling public class SimpleListener { @ReceiveHandler ReceiveOperationHandler receiveHandler() { MultiTakeReceiveOperationHandler receiveHandler = new MultiTakeReceiveOperationHandler(); receiveHandler.setMaxEntries(100); return receiveHandler; } @EventTemplate Data unprocessedData() { Data template = new Data(); template.setProcessed(false); return template; } @SpaceDataEvent public Data eventListener(Data event) { //process Data here } } Namespace <os-core:space id="space" url="/./space" /> <os-core:giga-space id="gigaSpace" space="space"/> <bean id="simpleListener" class="SimpleListener" /> <os-events:polling-container id="eventContainer" giga-space="gigaSpace"> <os-events:receive-operation-handler> <bean class="org.openspaces.events.polling.receive.MultiTakeReceiveOperationHandler" /> </os-events:receive-operation-handler> <os-core:template> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </os-core:template> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="simpleListener"/> </os-events:annotation-adapter> </os-events:listener> </os-events:polling-container> Plain XML <bean id="space" class="org.openspaces.core.space.UrlSpaceFactoryBean"> <property name="url" value="/./space" /> </bean> <bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean"> <property name="space" ref="space" /> </bean> <bean id="simpleListener" class="SimpleListener" /> <bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer"> <property name="gigaSpace" ref="gigaSpace" /> <property name="receiveOperationHandler"> <bean class="org.openspaces.events.polling.receive.MultiTakeReceiveOperationHandler" /> </property> <property name="template"> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </property> <property name="eventListener"> <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter"> <property name="delegate" ref="simpleListener" /> </bean> </property> </bean> Non-Blocking Receive HandlerWhen working with a partitioned cluster and configuring the remote polling container to work against the whole cluster, blocking operations (take with a timeout>0) are not allowed (when the routing field is not set on the template or SQLQuery). The default receive operation handlers support performing the receive operation in a non-blocking manner, by sleeping between non-blocking operations. For example, the SingleTakeReceiveOperationHandler performs a non-blocking take operation against the space and then sleeps for a configurable amount of time. A classic scenario where the Non-Blocking mode would be used is the Master-Worker Pattern.
Here is an example of how a Non-Blocking mode can be configured:
Annotation
@EventDriven @Polling (receiveTimeout=10000) public class SimpleListener { @ReceiveHandler ReceiveOperationHandler receiveHandler() { SingleTakeReceiveOperationHandler receiveHandler = new SingleTakeReceiveOperationHandler(); receiveHandler.setNonBlocking(true); receiveHandler.setNonBlockingFactor(10); return receiveHandler; } @EventTemplate Data unprocessedData() { Data template = new Data(); template.setProcessed(false); return template; } @SpaceDataEvent public Data eventListener(Data event) { //process Data here } } Namespace <os-core:space id="space" url="/./space" /> <os-core:giga-space id="gigaSpace" space="space"/> <bean id="simpleListener" class="SimpleListener" /> <os-events:polling-container id="eventContainer" giga-space="gigaSpace" receive-timeout="10000"> <os-events:receive-operation-handler> <bean class="org.openspaces.events.polling.receive.SingleTakeReceiveOperationHandler"> <property name="nonBlocking" value="true" /> <property name="nonBlockingFactor" value="10" /> </bean> </os-events:receive-operation-handler> <os-core:template> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </os-core:template> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="simpleListener"/> </os-events:annotation-adapter> </os-events:listener> </os-events:polling-container> Plain XML <bean id="space" class="org.openspaces.core.space.UrlSpaceFactoryBean"> <property name="url" value="/./space" /> </bean> <bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean"> <property name="space" ref="space" /> </bean> <bean id="simpleListener" class="SimpleListener" /> <bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer"> <property name="gigaSpace" ref="gigaSpace" /> <property name="receiveTimeout" value="10000" /> <property name="receiveOperationHandler"> <bean class="org.openspaces.events.polling.receive.SingleTakeReceiveOperationHandler"> <property name="nonBlocking" value="true" /> <property name="nonBlockingFactor" value="10" /> </bean> </property> <property name="template"> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </property> <property name="eventListener"> <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter"> <property name="delegate" ref="simpleListener" /> </bean> </property> </bean> The above example uses a receive timeout of 10 seconds (10000 milliseconds). The SingleTakeReceiveOperationHandler is configured to be non-blocking with a non-blocking factor of 10. This means that the receive handler performs 10 non-blocking takes within 10 seconds and sleeps the rest of the time (~1 second each time). Batch Operations and passArrayAsIsProcessing data in batches may improve the processing throughput performance. Instead of consuming object one at a time from the space and processing it, you may consume a batch with multiple objects and process these in one transaction. This may improve the overall throughput rate, but may impact the latency of the individual object processing time. See below a simple benchmark results comparing the different options: You may use batching via the MultiTakeReceiveOperationHandler. The MultiTakeReceiveOperationHandler.setMaxEntries(integer) allows you to set the maximum amount of objects to be consumed with each polling event. If the space does not have sufficient number of matching objects during the polling point in time, the event listener method will be called with the existing number of matching objects (will be smaller than the MaxEntries value. There will be no delay in such a case and the polling container will not wait until there will be exact amount of matching objects to consume as specified via the MaxEntries. Certain receive operation handlers might return an array as a result of the receive operation. A prime example is the MultiTakeReceiveOperationHandler, which might return an array as a result of a takeMultiple operation called by the polling container. By default, the polling container serializes the execution of the array into invocation of the event listener method for each element in the array. If you want the event to operate on the whole array (receive the array as a parameter into the event listener method), the passArrayAsIs annotation should be set to true. Here is an example for batch processing using the passArrayAsIs - with this example the polling container will consume a batch of objects using takeMultiple, modify these and write these back into the space in one operation using writeMultiple: @EventDriven @Polling(passArrayAsIs = true) public class SimpleBatchListener { @ReceiveHandler ReceiveOperationHandler receiveHandler() { MultiTakeReceiveOperationHandler receiveHandler = new MultiTakeReceiveOperationHandler(); receiveHandler.setMaxEntries(100); return receiveHandler; } @EventTemplate Data unprocessedData() { Data template = new Data(); template.setProcessed(false); return template; } @SpaceDataEvent public Data[] eventListener(Data events[]) { //process Data within a loop for (int i = 0; i < events.length; i++) { events[i].setProcessed(true); } return events; } } Free Polling Container ResourcesTo free the resources used by the polling container make sure you close it properly. A good life cycle event to place the destroy() call would be within the @PreDestroy or DisposableBean.destroy() method. Transaction SupportBoth the receive operation and the actual event action can be configured to be performed under a transaction. Transaction support is required when, for example, an exception occurs in the event listener, and the receive operation needs to be to rolled back (and the actual data event is returned to the space). Adding transaction support is very simple in the polling container, and can be done by injecting a transaction manager into it. For example:
Annotation
<!-- Enable scan for OpenSpaces and Spring components --> <context:component-scan base-package="com.mycompany"/> <!-- Enable support for @Polling annotation --> <os-events:annotation-support /> <os-core:space id="space" url="/./space" /> <os-core:distributed-tx-manager id="transactionManager" /> <os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/> @EventDriven @Polling @TransactionalEvent public class SimpleListener { @EventTemplate Data unprocessedData() { Data template = new Data(); template.setProcessed(false); return template; } @SpaceDataEvent public Data eventListener(Data event) { //process Data here } } Namespace <os-core:space id="space" url="/./space" /> <os-core:distributed-tx-manager id="transactionManager" /> <os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/> <bean id="simpleListener" class="SimpleListener" /> <os-events:polling-container id="eventContainer" giga-space="gigaSpace"> <os-events:tx-support tx-manager="transactionManager"/> <os-core:template> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </os-core:template> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="simpleListener"/> </os-events:annotation-adapter> </os-events:listener> </os-events:polling-container> Plain XML <bean id="space" class="org.openspaces.core.space.UrlSpaceFactoryBean"> <property name="url" value="/./space" /> </bean> <bean id="transactionManager" class="org.openspaces.core.transaction.manager.DistributedJiniTransactionManager"/> <bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean"> <property name="space" ref="space" /> <property name="transactionManager" ref="transactionManager" /> </bean> <bean id="simpleListener" class="SimpleListener" /> <bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer"> <property name="transactionManager" ref="transactionManager" /> <property name="gigaSpace" ref="gigaSpace" /> <property name="template"> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </property> <property name="eventListener"> <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter"> <property name="delegate" ref="simpleListener" /> </bean> </property> </bean> Code GigaSpace gigaSpace = ...//create a GigaSpace instance //creating a transaction manager. For more details please refer to the [Transaction Management] section PlatformTransactionManager ptm = new DistributedJiniTxManagerConfigurer().transactionManager(); //creating a listener SimpleListener listener = new SimpleListener(); //creating a polling container which will automatically start receiving event from the space SimplePollingEventListenerContainer pollingContainer configurer = new SimplePollingContainerConfigurer(gigaSpace) .template(listener.getTemplate()).eventListenerAnnotation(listener) .transactionManager(ptm) .receiveTimeout(1000) .pollingContainer(); When using transactions with polling container a special care should be taken with timeout values. Transactions started by the polling container can have a timeout value associated with them (if not set will default to the default timeout value of the transaction manager, which is 60 Sec). If setting a specific timeout value, make sure the timeout value is higher than the timeout value for blocking operations and includes the expected execution time of the associated listener.
Here is an example how timeout value (and transaction isolation) can be set with polling container:
Annotation
<!-- Enable scan for OpenSpaces and Spring components --> <context:component-scan base-package="com.mycompany"/> <!-- Enable support for @Polling annotation --> <os-events:annotation-support /> <os-core:space id="space" url="/./space" /> <os-core:distributed-tx-manager id="transactionManager"/> <os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/> @EventDriven @Polling @TransactionalEvent(isolation = Isolation.READ_COMMITTED, timeout = 1000) public class SimpleListener { @EventTemplate Data unprocessedData() { Data template = new Data(); template.setProcessed(false); return template; } @SpaceDataEvent public Data eventListener(Data event) { //process Data here } } Namespace <os-core:space id="space" url="/./space" /> <os-core:giga-space id="gigaSpace" space="space"/> <os-core:distributed-tx-manager id="transactionManager" /> <bean id="simpleListener" class="SimpleListener" /> <os-events:polling-container id="eventContainer" giga-space="gigaSpace"> <os-events:tx-support tx-manager="transactionManager" tx-timeout="1000" tx-isolation="READ_COMMITTED" /> <os-core:template> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </os-core:template> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="simpleListener"/> </os-events:annotation-adapter> </os-events:listener> </os-events:polling-container> Plain XML <bean id="space" class="org.openspaces.core.space.UrlSpaceFactoryBean"> <property name="url" value="/./space" /> </bean> <bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean"> <property name="space" ref="space" /> </bean> <bean id="transactionManager" class="org.openspaces.core.transaction.manager.DistributedJiniTransactionManager" /> <bean id="simpleListener" class="SimpleListener" /> <bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer"> <property name="gigaSpace" ref="gigaSpace" /> <property name="transactionManager" ref="transactionManager" /> <property name="transactionTimeout" value="1000" /> <property name="transactionIsolationLevelName" value="READ_COMMITTED" /> <property name="template"> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </property> <property name="eventListener"> <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter"> <property name="delegate" ref="simpleListener" /> </bean> </property> </bean> Trigger Receive OperationWhen configuring the polling event container to perform its receive operation and event actions under a transaction, a transaction is started and rolled back for each unsuccessful receive operation, which results in a higher load on the space. The polling event container allows pluggable logic to be used in order to decide if the actual receive operation should be performed or not. This logic, called the trigger receive operation, is performed outside the receive transaction boundaries. The following interface is provided for custom implementation of this logic: public interface TriggerOperationHandler { /** * Allows to perform a trigger receive operation which control if the active receive operation * will be performed in a polling event container. This feature is mainly used when having * polling event operations with transactions where the trigger receive operation is performed * outside of a transaction thus reducing the creation of transactions did not perform the * actual receive operation. * * <p> * If this operation returns a non <code>null</code> value, it means that the receive * operation should take place. If it returns a <code>null</code> value, no receive operation * will be attempted. * * @param template * The template to use for the receive operation. * @param gigaSpace * The GigaSpace interface to perform the receive operations with * @param receiveTimeout * Receive timeout value * @throws DataAccessException * */ Object triggerReceive(Object template, GigaSpace gigaSpace, long receiveTimeout) throws DataAccessException; /** * Controls if the object returned from * {@link #triggerReceive(Object,org.openspaces.core.GigaSpace,long)} will be used as the * template for the receive operation by returning <code>true</code>. If <code>false</code> * is returned, the actual template configured in the polling event container will be used. */ boolean isUseTriggerAsTemplate(); } OpenSpaces comes with a built-in implementation of this interface, called ReadTriggerOperationHandler. It performs a single blocking read operation (using the provided receive timeout), thus "peeking" into the space for relevant event data. If the read operation returns a value, this means that there is higher probability that the receive operation will succeed, and the transaction won't be started without a purpose. Here is how it can be configured:
Annotation
@EventDriven @Polling @TransactionalEvent public class SimpleListener { @TriggerHandler TriggerOperationHandler receiveHandler() { ReadTriggerOperationHandler triggerHandler = new ReadTriggerOperationHandler(); return triggerHandler; } @EventTemplate Data unprocessedData() { Data template = new Data(); template.setProcessed(false); return template; } @SpaceDataEvent public Data eventListener(Data event) { //process Data here } } Namespace <os-core:space id="space" url="/./space" /> <os-core:giga-space id="gigaSpace" space="space"/> <os-core:distributed-tx-manager id="transactionManager" /> <bean id="simpleListener" class="SimpleListener" /> <os-events:polling-container id="eventContainer" giga-space="gigaSpace"> <os-events:tx-support tx-manager="transactionManager"/> <os-events:trigger-operation-handler> <bean class="org.openspaces.events.polling.trigger.ReadTriggerOperationHandler" /> </os-events:trigger-operation-handler> <os-core:template> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </os-core:template> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="simpleListener"/> </os-events:annotation-adapter> </os-events:listener> </os-events:polling-container> Plain XML <bean id="space" class="org.openspaces.core.space.UrlSpaceFactoryBean"> <property name="url" value="/./space" /> </bean> <bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean"> <property name="space" ref="space" /> </bean> <bean id="transactionManager" class="org.openspaces.core.transaction.manager.DistributedJiniTransactionManager" /> <bean id="simpleListener" class="SimpleListener" /> <bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer"> <property name="transactionManager" ref="transactionManager" /> <property name="gigaSpace" ref="gigaSpace" /> <property name="triggerOperationHandler"> <bean class="org.openspaces.events.polling.trigger.ReadTriggerOperationHandler" /> </property> <property name="template"> <bean class="org.openspaces.example.data.common.Data"> <property name="processed" value="false"/> </bean> </property> <property name="eventListener"> <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter"> <property name="delegate" ref="simpleListener" /> </bean> </property> </bean>
Default Values of Polling Container Configuration ParametersThe default values for all of the polling container configuration parameters such as concurrent-consumers, active-when-primary, receive-timeout and others can be found in the JavaDoc (and sources) of the class org.openspaces.events.polling.SimplePollingEventListenerContainer and its super class, namely org.openspaces.events.polling.AbstractPollingEventListenerContainer. SchemaThe Polling Container schema and complete configuration options are described below: |
![]() |
GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence |