Summary: This page explains how to plugin a custom interceptor at the gateway which can be used to implement custom logic upon incoming replication events in a target gateway
OverviewThe Synchronization Endpoint Interceptor allows a custom logic to be plugged in to the Replication Gateway, this can be used for various use cases. One use case is custom handling of distributed transaction consolidation failure events, where one for example, can keep track of such events in some database for later diagnostics and decision. The interceptor is plugged into the target gateway sink upon construction. The Interceptor APIThe interceptor is an abstract class which may be extended to provide custom behavior only on relevant methods. public abstract class SynchronizationEndpointInterceptor { /** * Triggered when a consolidation for a specific distributed transaction participant is failed due to * timeout or too much backlog accumulation while waiting for other participant parts. * @param participantData the transaction participant data for which the consolidation failed */ public void onTransactionConsolidationFailure(ConsolidationParticipantData participantData) { participantData.commit(); } /** * Triggered after synchronization of a transaction was completed successfully. * @param transactionData the transaction data */ public void afterTransactionSynchronization(TransactionData transactionData) { } /** * Triggered after synchronization batch of operations was completed successfully. * @param batchData the batched operations data */ public void afterOperationsBatchSynchronization(OperationsBatchData batchData) { } } How to Plug a Custom InterceptorTo use a custom interceptor implementation one should first extends the SynchronizationEndpointInterceptor class and use the subclass in the target gateway pu.xml configuration: <bean id="interceptor" class="com.gigaspaces.examples.MyCustomSynchronizationEndpointInterceptor" /> <os-gateway:sink id="sink" local-gateway-name="MY-SITE-NAME" gateway-lookups="gatewayLookups" local-space-url="jini://*/*/mySiteSpace"> <os-gateway:sources> <os-gateway:source name="..." /> ... </os-gateway:sources> ... <os-gateway:sync-endpoint-interceptor interceptor="interceptor"/> </os-gateway:sink> Handling Intercepted eventsThere are three events the interceptor can receive and act upon On Transaction Consolidation FailureThis event is triggered upon distributed transaction consolidation failure, refer to Gateway and Distributed Transactions for more info about scenarios triggering this event. After Transaction SynchronizationThis event is triggered after the entire transaction is successfully replicated to the final target and the TransactionData contains all the data that the transaction consists of, including metadata and the source of the transaction. After Operations Batch SynchronizationThis event is triggered after a batch of non transactional operations were successfully replicated to the final target, the operations batching is determined by the gateway logic and is not reflecting the original client batch of operations if such existed. However, it maintains the original order of operations in the source. The OperationsBatchData contains the relevant data, which is the batch of operations itself and the source of this operations. The Data Sync OperationEach of the above intercepted events contains a data item that includes the relevant data synchronization operations. A transaction contains the operations that are executed within its boundaries. A batch of operations contains the list of operations that were synchronized in this batch. Each of this operations is a DataSyncOperation which expose the details of the single data synchronization operation. public interface DataSyncOperation { /** * @return The operation UID. */ String getUid(); /** * @return the operation type. */ DataSyncOperationType getDataSyncOperationType(); /** * @return the operation data as object (i.e pojo), this can only be used * if {@link #supportsDataAsObject()} return true, otherwise an exception * will be thrown. */ Object getDataAsObject(); /** * @return the operation data as space document, this can only be * used if {@link #supportsDataAsDocument()} return true, otherwise an exception * will be thrown. */ SpaceDocument getDataAsDocument(); /** * @return the type descriptor of the data type. this can only be * used if {@link #supportsGetTypeDescriptor()} return true, otherwise an exception * will be thrown. */ SpaceTypeDescriptor getTypeDescriptor(); /** * @return whether this data operation support the {@link #getTypeDescriptor()} operation. */ boolean supportsGetTypeDescriptor(); /** * @return whether this data operation support the {@link #getDataAsObject()} operation. */ boolean supportsDataAsObject(); /** * @return whether this data operation support the {@link #getDataAsDocument()} operation. */ boolean supportsDataAsDocument(); } This API exposes the operation type, such as Write, Update, Remove and so on, as well as the entry itself where such exists. Example of an Interceptor Handling Consolidation Failure eventsThe following example will demonstrate how to implement an interceptor that stores in some external data store the list of distributed transactions that failed to consolidate and aborts them for later manual decision. Note, that there is a regular case where consolidation may show a false failure as described in Gateway and Distributed Transactions. This example will handle this case as well. public class ExampleSynchronizationEndpointInterceptor extends SynchronizationEndpointInterceptor { ... private SomeExternalDataSource externalDataSource = ... public void onTransactionConsolidationFailure(ConsolidationParticipantData participantData) { TransactionParticipantMetaData metadata = participantData.getTransactionParticipantMetadata(); if(!externalDataSource.isExecuted(metadata.getTransactionUniqueId())) { DataSyncOperation[] operations = participantData.getTransactionParticipantDataItems(); externalDataSource.storeConsolidationFailedTransaction(metadata, operations); } participantData.abort(); } public void afterTransactionSynchronization(TransactionData transactionData) { if(transactionData.isConsolidated()) externalDataSource.storeExecutedConsolidatedTransactionMetadaa(transactionData. getConsolidatedDistributedTransactionMetaData().getTransactionUniqueId()); } } |
![]() |
GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence |