Summary: How to call custom business logic when data is replicated in a replicated cluster topology.
OverviewWhen constructing a replicated space topology you may need to call some business logic when data is replicated. GigaSpaces provides the IReplicationFilter plug-in interface com.j_spaces.core.cluster.IReplicationFilter , that allows you to build business logic that is called when data is sent through the replication channel. The IReplicationFilter methods are called before data is sent to the replication channel from the source space (output mode) and after coming out from the replication channel - i.e. before written to the target space (input mode). The replication filter should implement the IReplicationFilter interface methods. The replication filter can be used to monitor or alter the data passed through the replication channel. The replication channel passes IReplicationFilterEntry objects that store the replicated data. You should DefaultReplicationFilterProviderFactory and set its Replication Filter implementation when constructing the Space. You can use the same replication filter implementation class for both input and output replication modes. Here are the classes you will be using with your Replication Filter implementation:
Guidelines for Cluster Replication Filters
Example - Replication FilterThe following example will start two spaces replicating data to each other. The replication filter will display the replicated data that is passed through the replication channel. The example displays all objects sent via the output filter. When an object with the data Block me is passed, it is blocking by setting the replication Operation Type to ReplicationOperationType.DISCARD.
The Space Class
package com.test; import com.gigaspaces.annotation.pojo.SpaceClass; import com.gigaspaces.annotation.pojo.SpaceId; @SpaceClass public class MyClass { String id; String data; @SpaceId(autoGenerate = false) public String getId() { return id; } public void setId(String id) { this.id = id; } public String getData() { return data; } public void setData(String data) { this.data = data; } } The Application package com.test; import org.openspaces.core.GigaSpace; import org.openspaces.core.GigaSpaceConfigurer; import org.openspaces.core.space.UrlSpaceConfigurer; import org.openspaces.core.space.filter.replication.DefaultReplicationFilterProviderFactory; public class ReplicationFilterTestMain { public static void main(String[] args) throws Exception{ DefaultReplicationFilterProviderFactory repFactory = new DefaultReplicationFilterProviderFactory (); repFactory.setOutputFilter(new RepFilter()); repFactory.afterPropertiesSet(); GigaSpace gigaspace1 = new GigaSpaceConfigurer( new UrlSpaceConfigurer("/./space?cluster_schema=sync_replicated&total_members=2&id=1") .replicationFilterProvider(repFactory)). gigaSpace(); GigaSpace gigaspace2 = new GigaSpaceConfigurer( new UrlSpaceConfigurer("/./space?cluster_schema=sync_replicated&total_members=2&id=2")). gigaSpace(); MyClass o1 = new MyClass(); o1.setId("1"); o1.setData("AAA"); gigaspace1.write(o1); MyClass o2 = new MyClass(); o2.setId("2"); o2.setData("Block me"); gigaspace1.write(o2); MyClass o3 = gigaspace2.readById(MyClass.class,"1"); if (o3 != null) System.out.println("Replicated Object ID 1 value is:" + o3.getData()); MyClass o4 = gigaspace2.readById(MyClass.class,"2"); if (o4 != null) System.out.println("Replicated Object ID 2 value is:" + o4.getData()); else System.out.println("Object ID 2 has not been replicated"); } } The Replication Filter package com.test; import java.util.concurrent.atomic.AtomicInteger; import org.openspaces.core.GigaSpace; import org.openspaces.core.GigaSpaceConfigurer; import com.j_spaces.core.IJSpace; import com.j_spaces.core.client.ClientUIDHandler; import com.j_spaces.core.cluster.IReplicationFilter; import com.j_spaces.core.cluster.IReplicationFilterEntry; import com.j_spaces.core.cluster.ReplicationOperationType; import com.j_spaces.core.cluster.ReplicationPolicy; public class RepFilter implements IReplicationFilter{ @Override public void close() { } GigaSpace gigaspace = null; @Override public void init(IJSpace space, String paramUrl, ReplicationPolicy replicationPolicy) { // TODO Auto-generated method stub gigaspace = new GigaSpaceConfigurer(space).gigaSpace(); System.out.println("Rep Filter - Created "+gigaspace); } AtomicInteger counter = new AtomicInteger(0); @Override public void process(int direction, IReplicationFilterEntry replicationEntry, String remoteSpaceMemberName) { String filterDirectionStr = ""; String operationCodeStr = ""; switch( direction ) { case IReplicationFilter.FILTER_DIRECTION_INPUT: filterDirectionStr="INPUT"; break; case IReplicationFilter.FILTER_DIRECTION_OUTPUT: filterDirectionStr="OUTPUT"; break; } counter.incrementAndGet(); // increment the number of entries processed. System.out.println( "\nDefaultReplicationFilter" + "\n\t | Space: " + gigaspace + "\n\t | Packet No."+ counter + "\n\t | Direction: "+ filterDirectionStr + "\n\t | Operation code: "+ operationCodeStr + "\n\t | Entry packet UID: " + "\n\t | 2Str: "+ replicationEntry.toString() + replicationEntry.getUID() + "\n"); /* * Lets Block the "Block me" object on its way out */ if (direction == IReplicationFilter.FILTER_DIRECTION_OUTPUT && replicationEntry.getOperationType().equals(ReplicationOperationType.WRITE) && replicationEntry.getFieldsValues() != null && replicationEntry.getFieldValue("data").equals("Block me")) { System.out.println("\t | ==> Filter blocked outgoing object\n"); // dismiss replication packet: replicationEntry.discard();; } } } |
![]() |
GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence |