|
GigaSpaces XAP 8.0 API | |||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object org.openspaces.events.AbstractSpaceListeningContainer org.openspaces.events.AbstractEventListenerContainer org.openspaces.events.AbstractTemplateEventListenerContainer org.openspaces.events.AbstractTransactionalEventListenerContainer org.openspaces.events.polling.AbstractPollingEventListenerContainer org.openspaces.events.polling.SimplePollingEventListenerContainer
public class SimplePollingEventListenerContainer
Event listener container variant that uses plain Space take API, specifically a loop of
GigaSpace.take(Object,long)
calls that also allow for transactional
reception of messages.
Actual event listener execution happens in asynchronous work units which are created through
Spring's TaskExecutor
abstraction. By default, the
specified number of invoker tasks will be created on startup, according to the
"concurrentConsumers"
setting. Specify an alternative
TaskExecutor to integrate with an existing thread pool facility (such as a J2EE server's), for
example using a
CommonJ WorkManager
.
Event reception and listener execution can automatically be wrapped in transactions through
passing a Spring PlatformTransactionManager
into the
"transactionManager"
property. This will usually be a
LocalJiniTransactionManager
.
Dynamic scaling of the number of concurrent invokers can be activated through specifying a
"maxConcurrentConsumers"
value that is higher than the
"concurrentConsumers"
value. Since the latter's default is 1, you
can also simply specify a "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling
up to 5 concurrent consumers in case of increasing event load, as well as dynamic shrinking back
to the standard number of consumers once the load decreases. Consider adapting the
"idleTaskExecutionLimit"
setting to control the lifespan of
each new task, to avoid frequent scaling up and down. Note that using more than one consumer
might break fifo behavior if fifo is configured by the space or the specific class type.
Field Summary | |
---|---|
static long |
DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds. |
static String |
DEFAULT_THREAD_NAME_PREFIX
Default thread name prefix: "DefaultPollingEventListenerContainer-". |
Fields inherited from class org.openspaces.events.polling.AbstractPollingEventListenerContainer |
---|
DEFAULT_RECEIVE_TIMEOUT |
Fields inherited from class org.openspaces.events.AbstractEventListenerContainer |
---|
exceptionHandler, failedEvents, processedEvents |
Fields inherited from class org.openspaces.events.AbstractSpaceListeningContainer |
---|
beanName, logger |
Constructor Summary | |
---|---|
SimplePollingEventListenerContainer()
|
Method Summary | |
---|---|
protected TaskExecutor |
createDefaultTaskExecutor()
Create a default TaskExecutor. |
protected void |
doAfterStart()
|
protected void |
doBeforeStop()
|
protected void |
doInitialize()
A callback to perform custom initialization steps. |
protected void |
doRescheduleTask(Object task)
Re-executes the given task via this listener container's TaskExecutor. |
protected void |
doShutdown()
Destroy the container by waiting for all the current event listeners to shutdown. |
protected void |
eventReceived(Object event)
Template method that gets called right when a new message has been received, before attempting to process it. |
int |
getActiveConsumerCount()
Return the number of currently active consumers. |
int |
getConcurrentConsumers()
Return the "concurrentConsumer" setting. |
int |
getIdleTaskExecutionLimit()
Return the limit for idle executions of a receive task. |
int |
getMaxConcurrentConsumers()
Return the "maxConcurrentConsumer" setting. |
int |
getMaxEventsPerTask()
Return the maximum number of messages to process in one task. |
String |
getName()
|
int |
getScheduledConsumerCount()
Return the number of currently scheduled consumers. |
ServiceDetails[] |
getServicesDetails()
Retruns one or more service details that the service exposes. |
ServiceMonitors[] |
getServicesMonitors()
Retruns one or more service details that the service exposes. |
protected void |
handleListenerSetupFailure(Throwable ex,
boolean alreadyRecovered)
Handle the given exception that arose during setup of a listener. |
void |
initialize()
Initialize this container. |
void |
process(com.gigaspaces.internal.dump.InternalDump dump)
|
protected void |
recoverAfterListenerSetupFailure()
Recover this listener container after a listener failed to set itself up, for example reestablishing the underlying Connection. |
protected void |
refreshConnectionUntilSuccessful()
Refresh the underlying Connection, not returning before an attempt has been successful. |
protected void |
scheduleNewInvokerIfAppropriate()
Schedule a new invoker, increasing the total number of scheduled invokers for this listener container, but only if the specified "maxConcurrentConsumers" limit has not been reached yet, and only if this listener container does not currently have idle invokers that are waiting for new messages already. |
void |
setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create. |
void |
setIdleTaskExecutionLimit(int idleTaskExecutionLimit)
Specify the limit for idle executions of a receive task, not having received any event within its execution. |
void |
setMaxConcurrentConsumers(int maxConcurrentConsumers)
Specify the maximum number of concurrent consumers to create. |
void |
setMaxEventsPerTask(int maxEventsPerTask)
Specify the maximum number of events to process in one task. |
void |
setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds. |
void |
setTaskExecutor(TaskExecutor taskExecutor)
Set the Spring TaskExecutor to use for running the
listener threads. |
protected void |
sleepInbetweenRecoveryAttempts()
Sleep according to the specified recovery interval. |
Methods inherited from class org.openspaces.events.polling.AbstractPollingEventListenerContainer |
---|
doReceiveAndExecute, getReceiveOperationHandler, getReceiveTimeout, getTriggerOperationHandler, isPassArrayAsIs, receiveAndExecute, receiveEvent, setPassArrayAsIs, setReceiveOperationHandler, setReceiveTimeout, setTriggerOperationHandler |
Methods inherited from class org.openspaces.events.AbstractTemplateEventListenerContainer |
---|
afterPropertiesSet, getReceiveTemplate, getTemplate, isPerformSnapshot, setPerformSnapshot, setTemplate |
Methods inherited from class org.openspaces.events.AbstractEventListenerContainer |
---|
doStart, executeListener, getActualEventListener, getApplicationContext, getEventListener, getEventListenerClass, getExceptionHandler, getFailedEvents, getProcessedEvents, handleListenerException, invokeExceptionListener, invokeListener, setApplicationContext, setEventListener, setEventListenerRef, setExceptionHandler |
Methods inherited from class org.openspaces.events.AbstractSpaceListeningContainer |
---|
destroy, doStop, getBeanName, getGigaSpace, getStatus, isActive, isRunning, message, onApplicationEvent, rescheduleTaskIfNecessary, setActiveWhenPrimary, setAutoStart, setBeanName, setGigaSpace, setRegisterSpaceModeListener, shutdown, start, stop, waitWhileNotRunning |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Field Detail |
---|
public static final String DEFAULT_THREAD_NAME_PREFIX
public static final long DEFAULT_RECOVERY_INTERVAL
Constructor Detail |
---|
public SimplePollingEventListenerContainer()
Method Detail |
---|
public void setTaskExecutor(TaskExecutor taskExecutor)
TaskExecutor
to use for running the
listener threads. Default is SimpleAsyncTaskExecutor
,
starting up a number of new threads, according to the specified number of concurrent
consumers.
Specify an alternative TaskExecutor for integration with an existing thread pool. Note that this really only adds value if the threads are managed in a specific fashion, for example within a J2EE environment. A plain thread pool does not add much value, as this listener container will occupy a number of threads for its entire lifetime.
setConcurrentConsumers(int)
,
SimpleAsyncTaskExecutor
public void setRecoveryInterval(long recoveryInterval)
handleListenerSetupFailure(java.lang.Throwable, boolean)
public void setConcurrentConsumers(int concurrentConsumers)
Specifying a higher value for this setting will increase the standard level of scheduled concurrent consumers at runtime: This is effectively the minimum number of concurrent consumers which will be scheduled at any given time. This is a static setting; for dynamic scaling, consider specifying the "maxConcurrentConsumers" setting instead.
Raising the number of concurrent consumers is recommended in order to scale the consumption of events. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume events.
This setting can be modified at runtime, for example through JMX.
setMaxConcurrentConsumers(int)
public final int getConcurrentConsumers()
This returns the currently configured "concurrentConsumers" value; the number of currently scheduled/active consumers might differ.
getScheduledConsumerCount()
,
getActiveConsumerCount()
public void setMaxConcurrentConsumers(int maxConcurrentConsumers)
If this setting is higher than "concurrentConsumers", the listener container will dynamically schedule new consumers at runtime, provided that enough incoming messages are encountered. Once the load goes down again, the number of consumers will be reduced to the standard level ("concurrentConsumers") again.
Raising the number of concurrent consumers is recommended in order to scale the consumption of events. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume events.
This setting can be modified at runtime, for example through JMX.
setConcurrentConsumers(int)
public final int getMaxConcurrentConsumers()
This returns the currently configured "maxConcurrentConsumers" value; the number of currently scheduled/active consumers might differ.
getScheduledConsumerCount()
,
getActiveConsumerCount()
public void setMaxEventsPerTask(int maxEventsPerTask)
Default is unlimited (-1) in case of a standard TaskExecutor, and 1 in case of a SchedulingTaskExecutor that indicates a preference for short-lived tasks. Specify a number of 10 to 100 messages to balance between extremely long-lived and extremely short-lived tasks here.
Long-lived tasks avoid frequent thread context switches through sticking with the same thread all the way through, while short-lived tasks allow thread pools to control the scheduling. Hence, thread pools will usually prefer short-lived tasks.
This setting can be modified at runtime, for example through JMX.
setTaskExecutor(org.springframework.core.task.TaskExecutor)
,
AbstractPollingEventListenerContainer.setReceiveTimeout(long)
,
SchedulingTaskExecutor.prefersShortLivedTasks()
public int getMaxEventsPerTask()
public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit)
Within each task execution, a number of event reception attempts (according to the "maxEventsPerTask" setting) will each wait for an incoming event (according to the "receiveTimeout" setting). If all of those receive attempts in a given task return without an event, the task is considered idle with respect to received events. Such a task may still be rescheduled; however, once it reached the specified "idleTaskExecutionLimit", it will shut down (in case of dynamic scaling).
Raise this limit if you encounter too frequent scaling up and down. With this limit being higher, an idle consumer will be kept around longer, avoiding the restart of a consumer once a new load of messages comes in. Alternatively, specify a higher "maxMessagePerTask" and/or "receiveTimeout" value, which will also lead to idle consumers being kept around for a longer time (while also increasing the average execution time of each scheduled task).
This setting can be modified at runtime, for example through JMX.
setMaxEventsPerTask(int)
,
AbstractPollingEventListenerContainer.setReceiveTimeout(long)
public int getIdleTaskExecutionLimit()
public void initialize()
AbstractSpaceListeningContainer
true
will call AbstractSpaceListeningContainer.doStart()
(if it is set to true
,
lifecycle of the container will be controlled by the current space mode).
AbstractSpaceListeningContainer.doInitialize()
will be called for additional initialization after the possible
AbstractSpaceListeningContainer.doStart()
call.
initialize
in class AbstractPollingEventListenerContainer
AbstractSpaceListeningContainer.onApplicationEvent(org.springframework.context.ApplicationEvent)
protected void doAfterStart() throws DataAccessException
doAfterStart
in class AbstractSpaceListeningContainer
DataAccessException
protected void doBeforeStop() throws DataAccessException
doBeforeStop
in class AbstractSpaceListeningContainer
DataAccessException
protected TaskExecutor createDefaultTaskExecutor()
The default implementation builds a
SimpleAsyncTaskExecutor
with the specified bean name
(or the class name, if no bean name specified) as thread name prefix.
SimpleAsyncTaskExecutor.SimpleAsyncTaskExecutor(String)
protected void doRescheduleTask(Object task)
doRescheduleTask
in class AbstractSpaceListeningContainer
task
- the task object to reschedulesetTaskExecutor(org.springframework.core.task.TaskExecutor)
protected void eventReceived(Object event)
AbstractPollingEventListenerContainer
eventReceived
in class AbstractPollingEventListenerContainer
protected void scheduleNewInvokerIfAppropriate()
Called once an event has been received, to scale up while processing the event in the invoker that originally received it.
setTaskExecutor(org.springframework.core.task.TaskExecutor)
,
getMaxConcurrentConsumers()
public final int getScheduledConsumerCount()
This number will always be in between "concurrentConsumers" and "maxConcurrentConsumers", but might be higher than "activeConsumerCount" (in case of some consumers being scheduled but not executed at the moment).
getConcurrentConsumers()
,
getMaxConcurrentConsumers()
,
getActiveConsumerCount()
public final int getActiveConsumerCount()
This number will always be in between "concurrentConsumers" and "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount". (in case of some consumers being scheduled but not executed at the moment).
getConcurrentConsumers()
,
getMaxConcurrentConsumers()
,
getActiveConsumerCount()
protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered)
The default implementation logs the exception at error level if not recovered yet, and at debug level if already recovered. Can be overridden in subclasses.
ex
- the exception to handlealreadyRecovered
- whether a previously executing listener already recovered from the present
listener setup failure (this usually indicates a follow-up failure than be ignored
other than for debug log purposes)recoverAfterListenerSetupFailure()
protected void recoverAfterListenerSetupFailure()
The default implementation delegates to refreshConnectionUntilSuccessful
which
pings the space until it is available.
refreshConnectionUntilSuccessful()
protected void refreshConnectionUntilSuccessful()
The default implementation pings the space until a successful ping has been established.
setRecoveryInterval(long)
protected void sleepInbetweenRecoveryAttempts()
protected void doInitialize() throws DataAccessException
AbstractSpaceListeningContainer
doInitialize
in class AbstractSpaceListeningContainer
DataAccessException
protected void doShutdown() throws DataAccessException
doShutdown
in class AbstractSpaceListeningContainer
DataAccessException
AbstractSpaceListeningContainer.shutdown()
public ServiceDetails[] getServicesDetails()
ServiceDetailsProvider
getServicesDetails
in interface ServiceDetailsProvider
public ServiceMonitors[] getServicesMonitors()
ServiceMonitorsProvider
getServicesMonitors
in interface ServiceMonitorsProvider
public String getName()
getName
in interface com.gigaspaces.internal.dump.InternalDumpProcessor
public void process(com.gigaspaces.internal.dump.InternalDump dump) throws com.gigaspaces.internal.dump.InternalDumpProcessorFailedException
process
in interface com.gigaspaces.internal.dump.InternalDumpProcessor
com.gigaspaces.internal.dump.InternalDumpProcessorFailedException
|
GigaSpaces XAP 8.0 API | |||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |