Summary: How to call custom business logic when data is replicated in a replicated cluster topology.
OverviewWhen constructing a replicated space topology you may need to call some business logic when data is replicated. GigaSpaces provides the IReplicationFilter plug-in interface (com.j_spaces.core.cluster.IReplicationFilter; see Javadoc ), that allows you to build business logic that is called when data is sent through the replication channel. The IReplicationFilter methods are called before data is sent to the replication channel from the source space (output mode) and after coming out from the replication channel - i.e. before written to the target space (input mode). The replication filter should implement the IReplicationFilter interface methods. The replication filter can be used to monitor or alter the data passed through the replication channel. The replication channel passes Replication Filter Entry objects that store the replicated data. The cluster schema should include a class name for input and output mode that implements the IReplicationFilter, and some free text parameter you can pass into the replication filter. You can use the same replication filter implementation class for both input and output replication modes.
Guidelines for Replication Filters
Example - Replication FilterThe following example will start two spaces (replfilter-spaceA and replfilter-spaceB) replicating data. The replication filter will display the replicated data that passed through the replication channel.
The example displays all sent/received ReplicationFilterEntry in the output filter. Notice that all packets are sent according to their replication policy. When either the Interval Milliseconds or the Interval Operations times out, a replication is performed.
The Cluster Schema File
The cluster configuration file includes the following members: <?xml version="1.0"?> <cluster-config> <cluster-name>replicationfilter-cluster</cluster-name> <dist-cache> <config-name>DefaultConfig</config-name> </dist-cache> <cluster-members> <member> <member-name>repFilterSpaceA_container:repFilterSpaceA</member-name> <member-url>jini://localhost/repFilterSpaceA_container/repFilterSpaceA</member-url> </member> <member> <member-name>repFilterSpaceB_container:repFilterSpaceB</member-name> <member-url>jini://localhost/repFilterSpaceB_container/repFilterSpaceB</member-url> </member> </cluster-members> <groups> <group> <repl-policy> <replication-mode>sync</replication-mode> <policy-type>full-replication</policy-type> <recovery>true</recovery> <replicate-notify-templates>false</replicate-notify-templates> <trigger-notify-templates>true</trigger-notify-templates> <repl-find-timeout>5000</repl-find-timeout> <communication-mode>unicast</communication-mode> <async-replication> <repl-original-state>false</repl-original-state> <sync-on-commit>false</sync-on-commit> <sync-on-commit-timeout>300000</sync-on-commit-timeout> <repl-chunk-size>500</repl-chunk-size> <repl-interval-millis>3000</repl-interval-millis> <repl-interval-opers>500</repl-interval-opers> </async-replication> <sync-replication> <todo-queue-timeout>1500</todo-queue-timeout> <unicast> <min-work-threads>4</min-work-threads> <max-work-threads>16</max-work-threads> </unicast> <multicast> <ip-group>224.0.0.1</ip-group> <port>28672</port> <min-work-threads>4</min-work-threads> <max-work-threads>16</max-work-threads> </multicast> </sync-replication> </repl-policy> </group> </groups> </cluster-config> <cluster-config> <groups> <group> <group-name>sync_replicated_group</group-name> <group-members> <member> <member-name>repFilterSpaceA_container:repFilterSpaceA</member-name> <repl-filters> <input-filter-className>com.j_spaces.examples.replfilter.DefaultReplicationFilter </input-filter-className> <input-filter-paramUrl> </input-filter-paramUrl> <output-filter-className>com.j_spaces.examples.replfilter.DefaultReplicationFilter </output-filter-className> <output-filter-paramUrl> </output-filter-paramUrl> <active-when-backup>false</active-when-backup> <shutdown-space-on-init-failure>true </shutdown-space-on-init-failure> </repl-filters> </member> <member> <member-name>repFilterSpaceB_container:repFilterSpaceB</member-name> <repl-filters> <input-filter-className>com.j_spaces.examples.replfilter.DefaultReplicationFilter </input-filter-className> <input-filter-paramUrl> </input-filter-paramUrl> <output-filter-className>com.j_spaces.examples.replfilter.DefaultReplicationFilter </output-filter-className> <output-filter-paramUrl> </output-filter-paramUrl> </repl-filters> </member> </group-members> </groups> </cluster-config> You can achieve the above definition using the GS browser. Open the cluster configuration file (./config/replfilter-cluster-config.xml) Cluster --> Open, and navigate to the "Edit cluster group" interface. Edit the Replication Matrix and select the Filters/Recovery tab. Enable the Input/Output section by checking Filter. This definition applies to all members of the group.
Implementation See below the replication filter implementation. It prints the input and output packet: package com.j_spaces.examples.replfilter; import net.jini.core.entry.Entry; /** * Extended Message Entry, for using UID with entry Objects. * This is only done so we can show the entry UID's written to space */ public class Message implements Entry { transient public String m_UID; public String content; public Message() {} public Message(String msg) { this.content = msg; } public void __setEntryUID(String inUid) { this.m_UID = inUid; } public String __getEntryUID() { return m_UID; } } The Application package com.j_spaces.examples.replfilter; import com.j_spaces.core.IJSpace; import com.j_spaces.core.client.FinderException; import com.j_spaces.core.client.SpaceFinder; import net.jini.core.lease.Lease; public class HelloWorldReplicationFilter { public static void main(String[] args) { say("\nWelcome to GigaSpaces ReplicationFilter Example"); say("A replication filter is a class that implements the IReplicationFilter "+ "interface in order to monitor or alter the replication ReplicationFilterEntry.\n"); if ( args.length != 2 ) { say("Usage: <spaceURL-A> <spaceURL-B>"); say("spaceURL = '<protocol>://host:port/containername/spacename'"); System.exit(1); } try { IJSpace spaceA = (IJSpace)SpaceFinder.find( args[0] ); IJSpace spaceB = (IJSpace)SpaceFinder.find( args[1] ); // lets make sure they are clean incase of a re-RUN spaceA.clear(new Message() , null); spaceB.clear(new Message() , null); Message msgFromAtoB = new Message("Hello World from Space A"); Message msgFromBtoA = new Message("Hello World from Space B"); //let A write to B, and B write to A say("[1] let A write to B, and B write to A"); say("--------------------------------------"); spaceA.write(msgFromAtoB, null, Lease.FOREVER); spaceB.write(msgFromBtoA, null, Lease.FOREVER); log(spaceA,"From A to B", "WRITE", msgFromAtoB); log(spaceB,"From B to A", "WRITE", msgFromBtoA); /* Since the interval for replication is 3 seconds, * we need to sleep to allow replication to occur. * defined in cluster-config file as: * <repl-interval-millis>3000</repl-interval-millis> */ //count message from neighboring space; each should have 2 messages say("[2] count message from neighboring space; each should have 2 messages"); say("---------------------------------------------------------------------"); Message template = new Message(); int countA, countB; while ( (countA = spaceA.count(template, null)) !=2 || (countB = spaceB.count(template, null)) != 2 ) { say("\t Waiting for replication interval. Sleeping for 3 seconds..."); Thread.sleep(3000); } System.out.println("\t Replication complete.\n"); log(spaceA, "From A to A", "COUNT="+countA, template); log(spaceB, "From B to B", "COUNT="+countB, template); // let A update the message from B, and vise versa (Hello to Hey) say("[3] let A update the message from B, and vise versa (Hello to Hey)"); say("-----------------------------------------------------------------"); Message templateA = msgFromBtoA; Message templateB = msgFromAtoB; Message updatedByA = new Message("Hey World from Space B"); Message updatedByB = new Message("Hey World from Space A"); Message resultA = (Message)spaceA.update(templateA, updatedByA, null, Lease.FOREVER, 6000); Message resultB = (Message)spaceB.update(templateB, updatedByB, null, Lease.FOREVER, 6000); log(spaceA, "From A to A", "UPDATE", updatedByA); log(spaceB, "From B to B", "UPDATE", updatedByB); // let both take back the other's message say("[4] let both take back the other's message"); say("------------------------------------------"); Message takenA = (Message)spaceA.take(resultA, null, Integer.MAX_VALUE); Message takenB = (Message)spaceB.take(resultB, null, Integer.MAX_VALUE); log(spaceA, "From A to A", "TAKE", takenA); log(spaceB, "From B to B", "TAKE", takenB); // lets send a message that will be blocked by the replication filter // we will generate a specific message content to suffice the block rule, // and try to read it from the replicated space. It should return after timeout. say("[5] lets send a message that will be blocked by the replication filter"); say("----------------------------------------------------------------------"); Message blockedMsg = new Message("Block me"); spaceA.write(blockedMsg, null, Lease.FOREVER); log(spaceA,"From A to B", "WRITE BLOCKED", blockedMsg); Message msgAtSpaceB = (Message)spaceB.read(blockedMsg, null, 5000); if (msgAtSpaceB == null) log(spaceB,"From B to B", "READ timedout, BLOCK succeeded", msgAtSpaceB); else log(spaceB,"From B to B", "READ complete, BLOCK failed", msgAtSpaceB); System.exit(0); } catch( FinderException ex ) { ex.printStackTrace(); say("Could not find space: " + args[0]); say("Please check that the clustered space is running."); } catch (Exception e) { e.printStackTrace(); } } private static void log(IJSpace space, String direction, String operation, Message entry) { System.out.println( "log" + " | Space: " + space.getName() + "\n\t | Direction: " + direction + "\n\t | Operation code: " + operation + "\n\t | Entry packet: " + (entry != null ? entry.__getEntryUID() : null) + "\n\t | content: " + (entry != null ? entry.content : null )+"\n"); } private static void say(String msg) { System.out.println(msg); } } The Replication Filter implementation: package com.j_spaces.examples.replfilter; import com.j_spaces.core.IJSpace; import com.j_spaces.core.cluster.IReplicationFilter; import com.j_spaces.core.cluster.IReplicationFilterEntry; import com.j_spaces.core.cluster.ReplicationFilterException; import com.j_spaces.core.cluster.ReplicationPolicy; public class DefaultReplicationFilter implements IReplicationFilter { private IJSpace space; private String paramUrl; private ReplicationPolicy replicationPolicy; private int counter; /** * Initialize this filter. * @see com.j_spaces.core.cluster.IReplicationFilter#init * (com.j_spaces.core.IJSpace, java.lang.String, * com.j_spaces.core.cluster.ReplicationPolicy) */ public void init(IJSpace space, String paramUrl, ReplicationPolicy replicationPolicy) { this.space = space; this.paramUrl = paramUrl; this.replicationPolicy = replicationPolicy; this.counter = 0; } /** * Called by synchronize replication when a single IReplicationFilterEntry is about to be * sent/received (for output filter or input filter correspondingly). * @see com.j_spaces.core.cluster.IReplicationFilter#process(int, IReplicationFilterEntry, String) */ public void process(int direction, IReplicationFilterEntry replicationEntry, String remoteSpaceMemberName) throws ReplicationFilterException { String filterDirectionStr = ""; String operationCodeStr = ""; switch( direction ) { case IReplicationFilter.FILTER_DIRECTION_INPUT: filterDirectionStr="INPUT"; break; case IReplicationFilter.FILTER_DIRECTION_OUTPUT: filterDirectionStr="OUTPUT"; break; } switch ( replicationEntry.getOpCode()){ case IReplicationFilterEntry.WRITE: operationCodeStr = "WRITE"; break; case IReplicationFilterEntry.TAKE: operationCodeStr = "TAKE"; break; case IReplicationFilterEntry.EXTEND_LEASE: operationCodeStr = "EXTEND LEASE"; break; case IReplicationFilterEntry.UPDATE: operationCodeStr = "UPDATE"; break; case IReplicationFilterEntry.DUMMY: operationCodeStr = "DUMMY"; break; case IReplicationFilterEntry.LEASE_EXPIRATION: operationCodeStr = "LEASE TERMINATION"; break; default: /* lets disable the undefined packet, * by overwriting the operation code */ operationCodeStr = "UNDEFINED "+ replicationEntry.getOpCode(); replicationEntry.setOpCode( IReplicationFilterEntry.DUMMY); } counter++; // increment the number of entries processed. System.out.println( "\nDefaultReplicationFilter" + "\n\t | Space: " + space.getName() + "\n\t | Packet No."+ counter + "\n\t | Direction: "+ filterDirectionStr + "\n\t | Operation code: "+ operationCodeStr + "\n\t | Entry packet UID: " + "\n\t | 2Str: "+ replicationEntry.toString() + replicationEntry.getUID() + "\n"); /* * Lets Block the "Block me" entry on its way out */ if (direction == IReplicationFilter.FILTER_DIRECTION_OUTPUT && replicationEntry.getOpCode() == IReplicationFilterEntry.WRITE && replicationEntry.getFieldsValues() != null && replicationEntry.getFieldsValues()[0].equals("Block me") ) { System.out.println("\t | ==> Filter blocked outgoing message\n"); // dismiss packet: replicationEntry.setOpCode( IReplicationFilterEntry.DUMMY); } /* note that in this example, an entry was changed only when the operation * was undefined. otherwise, we are simply printing all replicated traffic. */ } /** * Closes this filter. Enables us (developers) to clean open resources. * We hint the Garbage Collector that we no longer need these resources. * @see com.j_spaces.core.cluster.IReplicationFilter#close() */ public void close() { space = null; paramUrl = null; replicationPolicy = null; } } Example - The XML Replication Filter
The XML replication filter example illustrates a generic replication filter that allows you to define the data to be filtered via XML.
Configuration Files
The example configurations files are located under the GigaSpaces Root\examples\Advanced\Integration_Plugins\XMLReplicationfilter\config folder. The XML-based filter This XML file includes the following elements: <gigaspace-replication-filter> <grant-objects> <object type> <member name> <member value> <?xml version="1.0"?> <gigaspace-replication-filter> <grant-objects> <object type="com.j_spaces.examples.xmlreplicationfilter.MyEntry"> <member name="m_counter" value="1" /> <member name="m_anotherField" value="1" /> </object> </grant-objects> </gigaspace-replication-filter> The Example Source Files MyEntry.java - the entry class used by the application Script Files runApplication.bat - the writer client application. Connect to siteA space , write 2 entries and print the content of siteA space and siteB space. SiteA space will have 2 entries of MyEntry and siteB space will have only 1 entry. Running the Application
To view siteA and siteB content - start the space browser. Notice that only one object has been replicated from spaceA to spaceB. Expected Output Client Applicationwelcome to GigaSpaces XML replication filter example! Connect to space jini://localhost/*/siteA CONFIG: Sets the system property ${com.gs.home} with value: E:\GigaSpacesXAP6.0\bin\\..\ Connect to space jini://localhost/*/siteA OK! checking if siteB space active... (UtilityClass-Output) node:siteB_container:siteB - Active:true (UtilityClass-Output) Cluster Active! Try Clean cluster... Clean cluster OK! Write Entry with m_counter=1 <-- This should be replicated to siteB Space Write Entry with m_counter=2 <-- This should NOT be replicated to siteB Space Finished Writing site A space have 2 MyEntry instance site B space have 1 MyEntry instance siteA space MyEntry Objects --------------------------- m_counter = 1 m_counter = 2 siteA output******************************************************** com.j_spaces.examples.xmlreplicationfilter.MyEntry ******************************************************** m_counter Field Value = 1 Grant Member Value = 1 Object replicated to siteB_container:siteB ******************************************************** com.j_spaces.examples.xmlreplicationfilter.MyEntry ******************************************************** m_counter Field Value = 2 Grant Member Value = 1 Object will not be replicated to siteB_container:siteB ***Link required |
![]() |
GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence |