Summary: This example gives a more extensive overview of OpenSpaces, the Processing Unit and the enhanced configurations in the pu.xml file. The example contains two Processing Units; one that feeds data objects into the system, and another that reads those objects and processes them.
Example Root <GigaSpaces Root>\examples\data

Overview

The purpose of this example is to show many of the capabilities that come with OpenSpaces and to walk the user through a complete application that can easily scale. In addition to the Hello World Example this example introduces the following terms:

  • POJO Annotations
  • Declarative GigaSpace and transactions
  • OpenSpaces event components – polling container, notify container and data event listener
  • OpenSpaces remoting
  • OpenSpaces clustering and SLA
  • Using GigaSpaces JMS in OpenSpaces – new in GigaSpaces 6.0.1
  • Using the JMS MessageConverter feature that writes POJOs to the space using the JMS API – new in GigaSpaces 6.0.1

Having shown the concept Processing Unit concept in the Hello World Example, we will now show that a PU can contain several services that are independent of each other but serve different purposes within one application. Moreover, different Processing Units use the space to share data, i.e. the same domain model. This is in fact the concept behind SOA and with OpenSpaces and SBA (Space Based Architecture). GigaSpaces provides a SOA platform that can be scalable in a linear manner while providing high performance and low latency.

Architecture

This example includes two modules that are deployed to the grid, and a domain model that consists of Data objects which are shared between the two modules. Each module runs within a Processing Unit, one runs the DataFeeder bean and the JMSDataFeeder bean that write Data objects with raw data into the remote space. The space is actually embedded within the other Processing Unit, which runs the DataProcessor bean. The DataProcessor service takes the new Data objects, processes the raw data and write them back to the space. Both Processing Units run additional services that interact with the space or with the other services, either in a polling mode, notify mode or remotely.

The entire application looks like this:

Application Workflow

  1. The DataFeeder writes non processed Data objects into the space every second.
  2. The JMSDataFeeder (new in GigaSpaces 6.0.1) uses Spring's JmsTemplate over GigaSpaces JMS to write non-processed Data objects into the space every second.
  3. The ViewDataCounter performs a count on the space every second for all Data objects (processed and non-processed).
  4. the DataRemoting performs direct remote calls on the DataProcessor service.
  5. The DataProcessor takes non-processed Data objects, processes and writes the processed Data objects back to the space.
  6. The ProcessedDataCount receives notifications of processed Data objects that are written or updated in the space. Notifications are configured to arrive in batches of 10 objects or every 5 seconds.

The services described above are independent of each other. They are merely explained here to show different capabilities of OpenSpaces.

The POJO Domain Model

The only object in our model is the Data object.

@SpaceClass
public class Data implements Serializable {

    public static long[] TYPES = {1, 2, 3, 4};
    private String id;
    private Long type;
    private String rawData;
    private String data;
    private Boolean processed;
[..]
	@SpaceId(autoGenerate = true)
    public String getId() {
        return id;
    }

    /**
     * The type of the data object. Used as the routing field when working with
     * a partitioned space.
     */
    @SpaceRouting
    public Long getType() {
        return type;
    }
[..]

Note the annotations that are used in this object:

  • @SpaceClass – the marked object is written into a space.
  • @SpaceId – marks the id attribute as the unique ID of this class. Each instance of Data should have a unique ID value.
  • @SpaceRouting – when using a partitioned cluster topolgy, Data objects are routed to the appropriate partitions according to the specified attribute, in this case type.

Basically, every Data object is written to the space by the DataFeeder or by the JMSDataFeeder with the processed value set to false, which is later set to true by the DataProcessor.

Even though our object implements Serializable, it doesn't have to in all cases. This is relevant only when the Data object is used in remote calls.

POJO Services and Wiring with Spring

According to the diagram above, there are 5 different services in our application, each is independent of the others and performs different actions as detailed below. The relevant wiring part, which resides in the relevant pu.xml file (there are two pu.xml files, one for each Processing Unit as shown in the diagram above) is described for each service.

DataProcessor

Code
public class DataProcessor implements IDataProcessor {

    private long workDuration = 100;

    public void setWorkDuration(long workDuration) {
        this.workDuration = workDuration;
    }

    @SpaceDataEvent
    public Data processData(Data data) {
        // sleep to simluate some work
        try {
            Thread.sleep(workDuration);
        } catch (InterruptedException e) {
            // do nothing
        }
        data.setProcessed(true);
        data.setData("PROCESSED : " + data.getRawData());
        System.out.println(" ------ PROCESSED : " + data);
        // reset the id as we use auto generate true
        data.setId(null);
        return data;
    }

    public void sayData(Data data) {
        System.out.println(" ++++ SAYING : " + data);
    }
}

Configuration
<bean id="dataProcessor" class="org.openspaces.example.data.processor.DataProcessor"/>

<os-events:polling-container id="dataProcessorPollingEventContainer" giga-space="gigaSpace">
    <os-events:tx-support tx-manager="transactionManager"/>
    <os-core:template>
        <bean class="org.openspaces.example.data.common.Data">
            <property name="processed" value="false"/>
        </bean>
    </os-core:template>
    <os-events:listener>
        <os-events:annotation-adapter>
            <os-events:delegate ref="dataProcessor"/>
        </os-events:annotation-adapter>
    </os-events:listener>
</os-events:polling-container>

The most interesting part of the code above is the processData() method. Note that it is marked with the @SpaceDataEvent annotation. As you'll see in the wiring part described later, this annotation marks the method to run periodically, take a Data object from the space, perform the business logic of this method (in our case, change the processed attribute to true) and write the object back to the space.

You might also have noticed that the DataProcessor implements IDataProcessor. This simple interface merley reflects the interface of DataProcessor and is used when the service is accessed remotely. In our example, as you can see in the diagram above, the DataRemoting service which is part of one Processing Unit, makes a remote call to the DataProcessor which is located on another Processing Unit (which runs in another process, or even another machine). This is exactly why our Data object is Serializable.

If you take a look at the Configuration tab above, we first define the <bean> of the service, this is a regular Spring configuration. Next we define <os-events:polling-container>, and in it, <os-events:listener>. This is what the annotation in the code is used for. As the listener element merely directs to our service, the @SpaceDataEvent annotation tells the container which method to execute. The polling-container sets the mode of event handling, which by default is a take operation on the space. The <os-core:template> element defines the template to look for in the space (in our case, a Data object with processed=false), as defined in the JavaSpaces specification).

DataProcessedCounter

The DataProcessedCounter is a service that shares a Processing Unit with the DataProcessor. It also contains a method that is marked with the @SpaceDataEvent annotation. However, there are two differences between the DataProcessedCounter and the DataProcessor. The minor difference is that the DataProcessedCounter processes Data objects with the processed attribute set to true (meaning objects that were already processed by the DataProcessor and written back to the space).

The major difference is that unlike the DataProcessor, which takes objects from the space to the process, the DataProcessedCounter is also notified about whenever these objects are written to the space or updated in the space. Moreover, instead of receiving notifications on each such events, the DataProcessedCounter receives batches of events that can be based on the number of objects, interval in milliseconds, or both:

Code
public class DataProcessedCounter {

    AtomicInteger processedDataCount = new AtomicInteger(0);

    @SpaceDataEvent
    public void dataProcessed(Data data) {
        processedDataCount.incrementAndGet();
        System.out.println("*** PROCESSED DATA COUNT [" + processedDataCount + "] DATA [" + data + "]");
    }

    public int getProcessedDataCount() {
        return processedDataCount.intValue();
    }
}

Configuration
<bean id="dataProcessedCounter" class="org.openspaces.example.data.processor.DataProcessedCounter"/>

<os-events:notify-container id="dataProcessedCounterNotifyContainer" giga-space="gigaSpace" com-type="UNICAST">
    <os-events:notify write="true" update="true"/>
    <os-events:batch size="10" time="5000"/>
    <os-core:sql-query where="processed = true" class="org.openspaces.example.data.common.Data"/>
    <os-events:listener>
        <os-events:annotation-adapter>
            <os-events:delegate ref="dataProcessedCounter"/>
        </os-events:annotation-adapter>
    </os-events:listener>
</os-events:notify-container>

The code is only the implementation of the counter. Everything else exists in the configuration, which means you can change the event-handling mode on the fly any way you want.

The <bean> is defined is a similar way, but now we are using an <os-events:notify-container> instead of <os-events:polling-container>, which means our listener is now waiting for notifications. The element <os-events:notify write="true" update="true"/> defines that we're interested only in events of write or update operations on the space. The <os-events:batch size="10" time="5000"/> defines the batch of events per notification, in this case either 10 objects or 5 seconds, which ever comes first.

Unlike the DataProcessor configuration which defines a template of the events we're interested in, using the DataProcessedCounter we define a SQL query, using the class type and and the WHERE clause of the query. The SQL query is equivalent to the way we defined the template, either way can be used.

DataFeeder

The DataFeeder is a Spring bean that repetitively (every one second) creates new Data objects with one of the four types (1, 2, 3 or 4), sets the processed value to false, and writes it to the space:

Code
public class DataFeeder implements InitializingBean, DisposableBean {
[..]
Data data = new Data(Data.TYPES[counter++ % Data.TYPES.length], "FEEDER " + Long.toString(time));
data.setProcessed(false);
gigaSpace.write(data);
[..]

Configuration
<bean id="dataFeeder" class="org.openspaces.example.data.feeder.DataFeeder"/>

The gigaSpace member is an instance of GigaSpace – the OpenSpaces object that is used to perform operations on the space. Note that the instance is created in a declarative manner:

Code
@GigaSpaceContext(name = "gigaSpace")
private GigaSpace gigaSpace;

Configuration
<!--
    Enables the usage of @GigaSpaceContext annotation based injection.
-->
<os-core:giga-space-context/>

<!--
    A bean representing a space (an IJSpace implementation).
    Note, we perform a lookup on the space since we are working against a remote space.
-->
<os-core:space id="space" url="jini://*/*/space" />

<!--
    OpenSpaces simplified space API built on top of IJSpace/JavaSpace.
-->
<os-core:giga-space id="gigaSpace" space="space"/>

In the Configuration tab above, we show how to define the space with its URL, and then define the GigaSpace instance which is based on that space. In this case, the space URL points to a remote space location (this is because the space is located within the DataProcessor Processing Unit, which is remote to the Processing Unit in which DataFeeder is running). This approach not only allows you to easily configure the application, but also to completely change the GigaSpace functionality implementation, without changing the code of your original application.

Because the DataFeeder implements Spring's InitializingBean and DisposableBean interfaces, its afterPropertiesSet() and destroy() methods are called when it is created or destroyed, respectively.

JMSDataFeeder

The JMSDataFeeder is similar to the DataFeeder. The difference between the beans is that the JMSDataFeeder uses Spring's JmsTemplate on top of the GigaSpaces JMS implementation to write the Data objects to the space; no space API is used directly. This is possible due to the usage of a MessageConverter that converts JMS messages into any required POJO type, in this case, Data. In this example, we configure the ConnectionFactory to use the ObjectMessage2ObjectConverter that comes with the GigaSpaces JMS implementation. The ObjectMessage2ObjectConverter receives a JMS ObjectMessage and returns the message's content (body) as the object to write to the space. The JMS ObjectMessage itself, including headers, properties etc., is not written. The JMSDataFeeder uses Spring's JmsTemplate and MessageCreator to send ObjectMessages that contain the Data objects, and the converter makes sure that only the contained Data objects are written.

The JMSDataFeeder is new in GigaSpaces version 6.0.1 and onwards. For more details on this feature, refer to the JMS-Space Interoperability section.

Code
public class JMSDataFeeder implements InitializingBean, DisposableBean {
    [..]
    Data data = new Data(Data.TYPES[counter++ % Data.TYPES.length], "FEEDER " + Long.toString(time));
    data.setProcessed(false);
    jmsTemplate.send(new MessageCreator() {
        public Message createMessage(Session session) throws JMSException {
            return session.createObjectMessage(data);
        }
    });
    [..]
}

Configuration
<bean id="jmsDataFeeder" class="org.openspaces.example.data.feeder.JMSDataFeeder"/>

The JMSDataFeeder is injected with a Spring JmsTemplate. The JmsTemplate is injected with a JMS ConnectionFactory and a destination of type Queue. Unlike the DataFeeder, the JMSDataFeeder does not declare an instance of GigaSpace. GigaSpace is injected into the ConnectionFactory bean and is used behind the scenes by the JMS layer. In addition, the ConnectionFactory is injected with a MessageConverter of type ObjectMessage2ObjectConverter.

Code
public class JMSDataFeeder implements InitializingBean, DisposableBean {
    [..]
    /** Sets the JmsTemplate */
    public void setJmsTemplate(JmsTemplate jmsTemplate)
    {
        this.jmsTemplate = jmsTemplate;
    }
    [..]
}

Configuration
<bean id="jmsDataFeeder" class="org.openspaces.example.data.feeder.JMSDataFeeder">
    <property name="instanceId" value="${clusterInfo.instanceId}" />
    <property name="numberOfTypes" value="${numberOfTypes}" />
    <property name="jmsTemplate" ref="jmsTemplate" />
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="defaultDestination" ref="destination" />
</bean>

<os-jms:queue id="destination" name="MyQueue" />

<os-jms:connection-factory id="connectionFactory" giga-space="gigaSpace" message-converter="messageConverter" />

<bean id="messageConverter" class="com.j_spaces.jms.utils.ObjectMessage2ObjectConverter" />

Because the JMSDataFeeder implements Spring's InitializingBean and DisposableBean interfaces, its afterPropertiesSet() and destroy() methods are called when it is created or destroyed, respectively.

ViewDataCounter

The ViewDataCounter is a simple service that performs a count operation on the space every second.

Code
int count = gigaSpace.count(new Data());

Configuration
<bean id="viewDataCounter" class="org.openspaces.example.data.feeder.ViewDataCounter"/>

Note that the ViewDataCounter is configured in the same pu.xml file as the DataFeeder, however, there is one major difference in how the two services connect to the space. The DataFeeder used a GigaSpace instance to access the space remotely, while the ViewDataCounter uses a different instance, a local view of the space. A local view is like a local filter defined in the client side that registers for specific templates. Every object that is written to the space or updated in it that matches the template is immediately sent to the local view of that client. In this way, different clients can keep different local caches according to their interests.

In this example, the ViewDataCounter is interested only in processed Data objects, so it defines a local view with a template and uses a GigaSpace instance that uses the local view.

Code
@GigaSpaceContext(name = "processedViewGigaSpace")
private GigaSpace gigaSpace;

Configuration
<os-core:local-view id="processedViewSpace" space="space">
	<os-core:view-query where="processed = true" class="org.openspaces.example.data.common.Data"/>
</os-core:local-view>

Because the ViewDataCounter implements Spring's InitializingBean and DisposableBean interfaces, its afterPropertiesSet() and destroy() methods are called when it is created or destroyed, respectively.

DataRemoting

DataRemoting demonstrates the remoting capabilities of OpenSpaces, using the space as the transport layer for the remote calls.

Implementing a remote service is quite straightforward, you just need to expose your remote interface and add the proper configuration. In our example, we want to be able to access the DataProcessor remotely, so we added the IDataProcessor interface. DataRemoting only uses IDataProcessor, and is completely ignorant to the underlying space.

Code
[..]
private IDataProcessor dataProcessor;
[..]
public void setDataProcessor(IDataProcessor dataProcessor) {
	this.dataProcessor = dataProcessor;
}
[..]
	Data data = new Data(Data.TYPES[counter++ % Data.TYPES.length], "REMOTING " + Long.toString(time));
	data.setProcessed(false);
	System.out.println("--- REMOTING PARAMTER " + data);
	dataProcessor.sayData(data);
	data = dataProcessor.processData(data);
	System.out.println("--- REMOTING RESULT   " + data);
[..]

Configuration
<!--
	The OpenSpaces Remoting proxy based on the IDataProcessor interface. Uses the space as the transport layer.
-->
<os-remoting:proxy id="dataProcessor" giga-space="gigaSpace"
	interface="org.openspaces.example.data.common.IDataProcessor" timeout="15000">
	<os-remoting:routing-handler>
		<bean class="org.openspaces.example.data.feeder.support.DataRemoteRoutingHandler"/>
	</os-remoting:routing-handler>
</os-remoting:proxy>

<!--
	The DataRemoting bean, uses the proxied dataProcessor without any knowledge of the remoting invocation.
-->
<bean id="dataRemoting" class="org.openspaces.example.data.feeder.DataRemoting">
	<property name="dataProcessor" ref="dataProcessor"/>
</bean>

Building and Packaging

This example includes a build.xml ant file and with a build.bat/sh to execute ant (there is no need to pre-install ant, the ant jars are already bundled in the <GigaSpaces Root>\lib directory).

From the <Example Root> directory (<GigaSpaces Root>\examples\openspaces\data) call:

build build

This compiles the code to a pu directory and copies the Processing Unit Deployment Descriptor, pu.xml.

The Deployment Descriptor should always reside under the META-INF\spring directory of your application.

Deployment

There are several ways to deploy Processing Units, two of which have been already explained in the Hello World Example. In this example, we will show how to the deploy the Processing Units onto the Service Grid using GigaSpaces Management Center.

After you have run the build, the next phase in order to prepare your deployment is to copy the directories of the Processing Units into the <GigaSpaces Root>\deploy directory.

Under <GigaSpaces Root>\examples\openspaces\data, you can find three directories; common, feeder and processor. The last two contain the two Processing Units; while common contains the common classes that both Processing Units use, in this case the Data object which is our POJO domain model, and the IDataProcessor which is needed for the remoting part.

In each of the other directories (feeder and processor) is a directory called pu\PU_NAME, where PU_NAME is the actual Processing Unit name, in our case, data-feeder and data-processor). These names are later used when deploying the Processing Units to the Service Grid. These are also the two directories you need to copy to <GigaSpaces Root>\deploy. But before you copy them, let's review their contents:

META-INF\spring is where pu.xml resides. shared-lib is the place to put additional classes (in a JAR file) that are used by the Processing Unit (like our Data and IDataProcessor classes, as well as any other external libraries). The rest are the compiled classes of the Processing Unit, under their appropriate package path (which in this example starts with org).

After you copied the two directories, start up the Service Grid. Because this exmaple runs in a partitioned cluster with two primary spaces and one backup space for each partition, you need to run one Grid Service Manager (GSM) and two Grid Service Containters (GSC), and then start the GigaSpaces Management Center:

Start <GigaSpaces Root>\bin\gsm
Start <GigaSpaces Root>\bin\gsc
Start <GigaSpaces Root>\bin\gsc
Start <GigaSpaces Root>\bin\gs-ui

In the UI, click on the tab named Deployments, Details and the click on the Deploy new application button ().

In the Deployment Wizard, choose SBA Application - Processing Unit and click Next.

Now, all you need to do is type the name of the Processing Unit (identical to the name of the folder that is now in the deploy directory) in the Processing Unit Name field and click Deploy.

Since we have two Processing Units, you need to repeat this process twice, once for the data-processor and then for the data-feeder. There is no need to change any other value in the wizard.

Since the spaces are running within the data-processor, it makes sense to deploy it first and the data-feeder second.

Now, check out the GSC consoles to see the events flowing in and out!

Really liked the example? Wasn't it as helpful as you hoped? Either way, write to us and tell us about it.

GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence