Summary: How to call custom business logic when data is replicated in a replicated cluster topology.

Overview

When constructing a replicated space topology you may need to call some business logic when data is replicated. GigaSpaces provides the IReplicationFilter plug-in interface (com.j_spaces.core.cluster.IReplicationFilter; see Javadoc ), that allows you to build business logic that is called when data is sent through the replication channel.

The IReplicationFilter methods are called before data is sent to the replication channel from the source space (output mode) and after coming out from the replication channel - i.e. before written to the target space (input mode). The replication filter should implement the IReplicationFilter interface methods.

The replication filter can be used to monitor or alter the data passed through the replication channel. The replication channel passes Replication Filter Entry objects that store the replicated data.

The cluster schema should include a class name for input and output mode that implements the IReplicationFilter, and some free text parameter you can pass into the replication filter. You can use the same replication filter implementation class for both input and output replication modes.

  • com.j_spaces.core.cluster.IReplicationFilterEntry – represents an Entry instance passed to the IReplicationFilter.
  • com.j_spaces.core.cluster.IReplicationFilter – a replication filter is a special hook point inside the replication object. Two types of replication filters can be defined – input replication and output replication. If both of the classes specified (for input and output) are the same, only one filter object will be used for both input and output replication.
  • com.j_spaces.core.cluster.ReplicationFilterException – the ReplicationFilterException is thrown when there are errors that occur in the replication filter. Errors can happen in the source or target space. The error is wrapped as part of the ReplicationFilterException and thrown back to the client. The ReplicationFilterException includes methods that includes information about the origin of the error, replication mode (input/output), the implementation class and the underlying exception. The ReplicationFilterException.getCause() should be used to retrieve the original exception that occurred.
In GigaSpaces 6.0.2 and onwards, it is possible to control replication at the operation level, using configuration only. For more details, refer to the Replication Group - Cluster Schema*** section.

Guidelines for Replication Filters

  • In order to block a ReplicationFilterEntry to enter the replication channel and replicated to the target space, assign a DUMMY value in the IReplicationFilterEntry operation code field.
  • Please don't overwrite the m_Key (serial number) field of the packet.
  • Entry packet field values (m_FieldsValues array) may be changed, but notice that if the serialization type of the space is not 0 (that is, fields are serialized inside the space) – then each non-native field (i.e. not from the Java.lang package) is stored in the array in a serialized format.
  • For outgoing replication packets, if you want to change the values of some fields, deep-cloning of the m_FieldsValues array is needed, since the m_FieldsValues is a reference to the array stored in the space internal data structures.
  • When using synchronous replication and an error occurs at the replication filter implementation, ReplicationFilterException is thrown back into the relevant thread at the client application. The ReplicationFilterException can be originated at the source or target space. The ReplicationFilterException will include the relevant information to identify the origin and the underlying exception that caused the problem.
  • When using asynchronous replication and an error occurs at the replication filter implementation, the space replication channel will be disabled, and an error will be logged into the space log file and displayed at the space console. The client application continues to function against its source space but there will not be any replication into the target space. In order to enable the replication, you should use the IRemoteJSpaceAdmin.changeReplicationState() or use the space browser cluster viewer.

Example - Replication Filter

The following example will start two spaces (replfilter-spaceA and replfilter-spaceB) replicating data. The replication filter will display the replicated data that passed through the replication channel.

Find a full running example at <GigaSpaces ROOT>\examples\Advanced\Integration_Plugins\replicationfilter.

The example displays all sent/received ReplicationFilterEntry in the output filter. Notice that all packets are sent according to their replication policy. When either the Interval Milliseconds or the Interval Operations times out, a replication is performed.
DUMMY packets are sent when a sequence of operations performed on one space does not need to be performed again on the replicated ones. For example, when using asynchronous replication, a sequence of write and take on the same object does not need to replicated. Therefore, a DUMMY packet is sent instead. In contrast, the take operation is always replicated to ensure full replication.

In the example, in order to block explicitly some objects from being replicated, the IReplicationFilterEntry.DUMMY OpCode is sent.

The Cluster Schema File

The cluster configuration file includes the following members:

<?xml version="1.0"?>
<cluster-config>
     <cluster-name>replicationfilter-cluster</cluster-name>
     <dist-cache>
          <config-name>DefaultConfig</config-name>
     </dist-cache>
     <cluster-members>
          <member>
               <member-name>repFilterSpaceA_container:repFilterSpaceA</member-name>
               <member-url>jini://localhost/repFilterSpaceA_container/repFilterSpaceA</member-url>
          </member>
          <member>
               <member-name>repFilterSpaceB_container:repFilterSpaceB</member-name>
               <member-url>jini://localhost/repFilterSpaceB_container/repFilterSpaceB</member-url>
          </member>
     </cluster-members>
     <groups>
          <group>
             
               <repl-policy>
                    <replication-mode>sync</replication-mode>
                    <policy-type>full-replication</policy-type>
                    <recovery>true</recovery>
                    <replicate-notify-templates>false</replicate-notify-templates>
                    <trigger-notify-templates>true</trigger-notify-templates>
                    <repl-find-timeout>5000</repl-find-timeout>
                    <communication-mode>unicast</communication-mode>
                    <async-replication>
                         <repl-original-state>false</repl-original-state>
                         <sync-on-commit>false</sync-on-commit>
                         <sync-on-commit-timeout>300000</sync-on-commit-timeout>
                         <repl-chunk-size>500</repl-chunk-size>
                         <repl-interval-millis>3000</repl-interval-millis>
                         <repl-interval-opers>500</repl-interval-opers>
                    </async-replication>
                    <sync-replication>
                         <todo-queue-timeout>1500</todo-queue-timeout>
                         <unicast>
                              <min-work-threads>4</min-work-threads>
                              <max-work-threads>16</max-work-threads>
                         </unicast>
                         <multicast>
                              <ip-group>224.0.0.1</ip-group>
                              <port>28672</port>
                              <min-work-threads>4</min-work-threads>
                              <max-work-threads>16</max-work-threads>
                         </multicast>
                    </sync-replication>
               </repl-policy>
          </group>
     </groups>
</cluster-config>


The cluster schema defines the Input Filter Class Name and Output Filter Class Name for all members in the group.

<cluster-config>
     <groups>
          <group>
  <group-name>sync_replicated_group</group-name>
               <group-members>
                    <member>
                         <member-name>repFilterSpaceA_container:repFilterSpaceA</member-name>
                         <repl-filters>
                              <input-filter-className>com.j_spaces.examples.replfilter.DefaultReplicationFilter
                              </input-filter-className>
                              <input-filter-paramUrl>
                              </input-filter-paramUrl>
                              <output-filter-className>com.j_spaces.examples.replfilter.DefaultReplicationFilter
                              </output-filter-className>
                              <output-filter-paramUrl>
                              </output-filter-paramUrl>
                              <active-when-backup>false</active-when-backup> 
                     <shutdown-space-on-init-failure>true
                     </shutdown-space-on-init-failure>
                         </repl-filters>
                    </member>
                    <member>
                         <member-name>repFilterSpaceB_container:repFilterSpaceB</member-name>
                         <repl-filters>
                              <input-filter-className>com.j_spaces.examples.replfilter.DefaultReplicationFilter
                              </input-filter-className>
                              <input-filter-paramUrl>
                              </input-filter-paramUrl>
                              <output-filter-className>com.j_spaces.examples.replfilter.DefaultReplicationFilter
                              </output-filter-className>
                              <output-filter-paramUrl>
                              </output-filter-paramUrl>
                         </repl-filters>
                    </member>
               </group-members>
     </groups>
</cluster-config>

You can achieve the above definition using the GS browser. Open the cluster configuration file (./config/replfilter-cluster-config.xml) Cluster --> Open, and navigate to the "Edit cluster group" interface. Edit the Replication Matrix and select the Filters/Recovery tab. Enable the Input/Output section by checking Filter. This definition applies to all members of the group.

Distinguish between the space filter and a replication filter. The latter does not appear in the space schema file. Space filters implement the ISpaceFilter interface.
  • <active-when-backup> – defined for a certain member's replication filter, and controls the replication filter's activity mode. When set to false, the replication filter init method is called, only when the space is in active mode (i.e., the backup space is moved to active mode). When set to true, the replication filter init() method is also called for spaces running in backup mode.
  • <shutdown-space-on-init-failure> – defined for a certain member's replication filter. shutdown-space-on-init-failure=true shuts down the space in case of a replication filter initialization failure. Meaning, if the user-defined init() implementation throws any exception (it doesn't matter which exception), this means a failure has occurred.

Implementation

See below the replication filter implementation. It prints the input and output packet:
The Entry:

package com.j_spaces.examples.replfilter;
import net.jini.core.entry.Entry;
/**
 * Extended Message Entry, for using UID with entry Objects.
 * This is only done so we can show the entry UID's written to space
 */
public class Message implements Entry {
    transient public String m_UID;
    public String content;
    
    public Message() {}
    
    public Message(String msg) {
    this.content = msg;
    }
    
    public void __setEntryUID(String inUid) {
      this.m_UID = inUid;
    }
    public String __getEntryUID() {
        return m_UID;
    }
}

The Application
package com.j_spaces.examples.replfilter;
import com.j_spaces.core.IJSpace;
import com.j_spaces.core.client.FinderException;
import com.j_spaces.core.client.SpaceFinder;
import net.jini.core.lease.Lease;
public class HelloWorldReplicationFilter {
    public static void main(String[] args) {
        say("\nWelcome to GigaSpaces ReplicationFilter Example");        
        say("A replication filter is a class that implements the IReplicationFilter "+
        "interface in order to monitor or alter the replication ReplicationFilterEntry.\n");        
    if ( args.length != 2 ) {
      say("Usage: <spaceURL-A> <spaceURL-B>");
say("spaceURL = '<protocol>://host:port/containername/spacename'");
System.exit(1);
    }
    try {
    IJSpace spaceA = (IJSpace)SpaceFinder.find( args[0] );
    IJSpace spaceB = (IJSpace)SpaceFinder.find( args[1] );
    
    // lets make sure they are clean incase of a re-RUN
spaceA.clear(new Message() , null);
spaceB.clear(new Message() , null);
    
    Message msgFromAtoB = new Message("Hello World from Space A");
    Message msgFromBtoA = new Message("Hello World from Space B");
    
    //let A write to B, and B write to A
    say("[1] let A write to B, and B write to A");
    say("--------------------------------------");
    spaceA.write(msgFromAtoB, null, Lease.FOREVER);
    spaceB.write(msgFromBtoA, null, Lease.FOREVER);
    log(spaceA,"From A to B", "WRITE", msgFromAtoB);
    log(spaceB,"From B to A", "WRITE", msgFromBtoA);
    /* Since the interval for replication is 3 seconds,
     * we need to sleep to allow replication to occur.
     * defined in cluster-config file as:
     * <repl-interval-millis>3000</repl-interval-millis>
     */
    
    //count message from neighboring space; each should have 2 messages
    say("[2] count message from neighboring space; each should have 2 messages");
    say("---------------------------------------------------------------------");
    Message template = new Message();
    int countA, countB;
    while ( (countA = spaceA.count(template, null)) !=2 ||
    (countB = spaceB.count(template, null)) != 2 ) {
    say("\t Waiting for replication interval. Sleeping for 3 seconds...");
    Thread.sleep(3000);
    }
    System.out.println("\t Replication complete.\n");
    
    log(spaceA, "From A to A", "COUNT="+countA, template);
    log(spaceB, "From B to B", "COUNT="+countB, template);
    
    
    // let A update the message from B, and vise versa (Hello to Hey)
    say("[3] let A update the message from B, and vise versa (Hello to Hey)");
    say("-----------------------------------------------------------------");
    Message templateA = msgFromBtoA;
    Message templateB = msgFromAtoB;
    
    Message updatedByA = new Message("Hey World from Space B");
    Message updatedByB = new Message("Hey World from Space A");
    
    Message resultA = (Message)spaceA.update(templateA, updatedByA,
    null, Lease.FOREVER, 6000);
    Message resultB = (Message)spaceB.update(templateB, updatedByB,
    null, Lease.FOREVER, 6000);
    
    log(spaceA, "From A to A", "UPDATE", updatedByA);
    log(spaceB, "From B to B", "UPDATE", updatedByB);
    
    // let both take back the other's message
    say("[4] let both take back the other's message");
    say("------------------------------------------");
    Message takenA = (Message)spaceA.take(resultA, null, Integer.MAX_VALUE);
    Message takenB = (Message)spaceB.take(resultB, null, Integer.MAX_VALUE);
    
    log(spaceA, "From A to A", "TAKE", takenA);
    log(spaceB, "From B to B", "TAKE", takenB);
    
    
    // lets send a message that will be blocked by the replication filter
    // we will generate a specific message content to suffice the block rule,
    // and try to read it from the replicated space. It should return after timeout.
    say("[5] lets send a message that will be blocked by the replication filter");
    say("----------------------------------------------------------------------");
    Message blockedMsg = new Message("Block me");
    spaceA.write(blockedMsg, null, Lease.FOREVER);
    log(spaceA,"From A to B", "WRITE BLOCKED", blockedMsg);
    Message msgAtSpaceB = (Message)spaceB.read(blockedMsg, null, 5000);
    if (msgAtSpaceB == null)
        log(spaceB,"From B to B", "READ timedout, BLOCK succeeded", msgAtSpaceB);
    else
        log(spaceB,"From B to B", "READ complete, BLOCK failed", msgAtSpaceB);
    
    System.exit(0);
    } catch( FinderException ex ) {
    ex.printStackTrace();
    say("Could not find space: " + args[0]);
    say("Please check that the clustered space is running.");
    } catch (Exception e) {
          e.printStackTrace();
        }
   }
    
    private static void log(IJSpace space, String direction, String operation,
    Message entry) {
    System.out.println(
            "log"
+ " | Space: " + space.getName()
+ "\n\t | Direction: " + direction
+ "\n\t | Operation code: " + operation
+ "\n\t | Entry packet: " + (entry != null ? entry.__getEntryUID() : null)
+ "\n\t | content: " + (entry != null ? entry.content : null )+"\n");
    }
    
    private static void say(String msg) {
    System.out.println(msg);
    }
}

The Replication Filter implementation:

package com.j_spaces.examples.replfilter;
import com.j_spaces.core.IJSpace;
import com.j_spaces.core.cluster.IReplicationFilter;
import com.j_spaces.core.cluster.IReplicationFilterEntry;
import com.j_spaces.core.cluster.ReplicationFilterException;
import com.j_spaces.core.cluster.ReplicationPolicy;
public class DefaultReplicationFilter implements IReplicationFilter
{
private IJSpace space;
private String paramUrl;
private ReplicationPolicy replicationPolicy;
private int counter;
 
/**
 * Initialize this filter.
 * @see com.j_spaces.core.cluster.IReplicationFilter#init
 * (com.j_spaces.core.IJSpace, java.lang.String,
 * com.j_spaces.core.cluster.ReplicationPolicy)
 */
public void init(IJSpace space, String paramUrl,
ReplicationPolicy replicationPolicy) {
this.space = space;
this.paramUrl = paramUrl;
this.replicationPolicy = replicationPolicy;
this.counter = 0;
}
/**
 * Called by synchronize replication when a single IReplicationFilterEntry is about to be
 * sent/received (for output filter or input filter correspondingly).
 * @see com.j_spaces.core.cluster.IReplicationFilter#process(int, IReplicationFilterEntry, String)
 */
public void process(int direction, IReplicationFilterEntry replicationEntry,
String remoteSpaceMemberName) throws ReplicationFilterException {
 
       String filterDirectionStr = "";
        String operationCodeStr = "";
        
       switch( direction ) {
           case IReplicationFilter.FILTER_DIRECTION_INPUT:
            filterDirectionStr="INPUT";  
           break;
            case IReplicationFilter.FILTER_DIRECTION_OUTPUT:
                filterDirectionStr="OUTPUT";
            break;
       }
       switch ( replicationEntry.getOpCode()){
        case IReplicationFilterEntry.WRITE:            
            operationCodeStr = "WRITE";
        break;
        case IReplicationFilterEntry.TAKE:          
            operationCodeStr = "TAKE";
        break;
        case IReplicationFilterEntry.EXTEND_LEASE:  
            operationCodeStr = "EXTEND LEASE";
        break;
        case IReplicationFilterEntry.UPDATE:            
            operationCodeStr = "UPDATE";
        break;
        case IReplicationFilterEntry.DUMMY:         
            operationCodeStr = "DUMMY";
        break;
        case IReplicationFilterEntry.LEASE_EXPIRATION:
            operationCodeStr = "LEASE TERMINATION";
        break;
        default:
           /* lets disable the undefined packet,
            * by overwriting the operation code */
           operationCodeStr = "UNDEFINED "+ replicationEntry.getOpCode();
           replicationEntry.setOpCode( IReplicationFilterEntry.DUMMY);
       }
       
       counter++;   // increment the number of entries processed.
       
       System.out.println(
                "\nDefaultReplicationFilter"
                + "\n\t | Space: " + space.getName()
                + "\n\t | Packet No."+ counter
                + "\n\t | Direction: "+ filterDirectionStr
                + "\n\t | Operation code: "+ operationCodeStr
                + "\n\t | Entry packet UID: "
                + "\n\t | 2Str: "+ replicationEntry.toString()
                + replicationEntry.getUID() + "\n");
        /*
         * Lets Block the "Block me" entry on its way out
         */
        if (direction == IReplicationFilter.FILTER_DIRECTION_OUTPUT
                && replicationEntry.getOpCode() == IReplicationFilterEntry.WRITE
                && replicationEntry.getFieldsValues() != null
                && replicationEntry.getFieldsValues()[0].equals("Block me") )
        {
            System.out.println("\t | ==> Filter blocked outgoing message\n");
            // dismiss packet:
            replicationEntry.setOpCode( IReplicationFilterEntry.DUMMY);
        }
       
       /* note that in this example, an entry was changed only when the operation
        * was undefined. otherwise, we are simply printing all replicated traffic.
        */
}
/**
 * Closes this filter. Enables us (developers) to clean open resources.
 * We hint the Garbage Collector that we no longer need these resources.
 * @see com.j_spaces.core.cluster.IReplicationFilter#close()
 */
public void close() {
space = null;
paramUrl = null;
replicationPolicy = null;
}
}

Example - The XML Replication Filter

Download the XML replication filter example and extract it under \GigaSpaces Root\examples\Advanced\Integration_Plugins.

The XML replication filter example illustrates a generic replication filter that allows you to define the data to be filtered via XML.
The example is composed from 2 spaces: siteA space and siteB space, clustered via asynchronous replication policy, where a client application connects to spaceA and writes 2 entries. The XML file includes the class names and their values of a matching object. That object has security grants to be replicated to siteB.

Configuration Files

The example configurations files are located under the GigaSpaces Root\examples\Advanced\Integration_Plugins\XMLReplicationfilter\config folder.
cluster-configWXMLRepfilter.xml - the static cluster configuration. Includes the location of the XML based filter to be used by the replication filter implementation.
ReplicationFilterConfig.xml - The XML based filter. The objects to be replicated data
rfilter.properties - space property file. Includes the location of the static cluster configuration file.


The XML-based filter

This XML file includes the following elements:

<gigaspace-replication-filter>
<grant-objects>
<object type>
<member name>
<member value>

<?xml version="1.0"?>
<gigaspace-replication-filter>
	<grant-objects>
		<object type="com.j_spaces.examples.xmlreplicationfilter.MyEntry">
			<member name="m_counter" value="1" />
			<member name="m_anotherField" value="1" />
		</object>
	</grant-objects>
</gigaspace-replication-filter>

The Example Source Files

MyEntry.java - the entry class used by the application
ReplicationFilterTest.java - The writer application
Utility.java - Cluster utility class
GrantMemeber.java - Define the Class member XML elements
GrantObject.java- Define the Class XML elements
XML2Object.java - Converts XML to Class
XMLReplicationFilter.java - The Replication filter implementation


Script Files

runApplication.bat - the writer client application. Connect to siteA space , write 2 entries and print the content of siteA space and siteB space. SiteA space will have 2 entries of MyEntry and siteB space will have only 1 entry.
setExampleEnv.bat - global settings
startAllSites.bat - start siteA and siteB spaces.
startSiteA.bat - start siteA
startSiteB.bat - start siteB
compile - compiles the source files


Running the Application
  • run bin\ startAllSites - start the clustered spaces
  • run bin\ runApplication.bat - will run the client application.

To view siteA and siteB content - start the space browser. Notice that only one object has been replicated from spaceA to spaceB.


Expected Output

Client Application

welcome to GigaSpaces XML replication filter example!
Connect to space jini://localhost/*/siteA
CONFIG: Sets the system property ${com.gs.home} with value: E:\GigaSpacesXAP6.0\bin\\..\
Connect to space jini://localhost/*/siteA OK!
checking if siteB space active...
(UtilityClass-Output) node:siteB_container:siteB - Active:true
(UtilityClass-Output) Cluster Active!
Try Clean cluster...
Clean cluster OK!
Write Entry with m_counter=1   <-- This should be replicated to siteB Space
Write Entry with m_counter=2   <-- This should NOT be replicated to siteB Space
Finished Writing
site A space have 2 MyEntry instance
site B space have 1 MyEntry instance

siteA space MyEntry Objects
---------------------------
m_counter = 1
m_counter = 2

siteA output

********************************************************
com.j_spaces.examples.xmlreplicationfilter.MyEntry
********************************************************
m_counter Field Value = 1
Grant Member Value = 1
Object replicated to siteB_container:siteB
********************************************************
com.j_spaces.examples.xmlreplicationfilter.MyEntry
********************************************************
m_counter Field Value = 2
Grant Member Value = 1
Object will not be replicated to siteB_container:siteB

***Link required

GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence