Tutorial summary: Learn how to create and run a Processing Unit - a scalable unit of deployment, inside your development environment. Learn how to use the GigaSpaces basic API, by implementing a simple processor and feeder application. Approx 10 min

Overview

Example Folder - Sample application is located in <GigaSpaces Root>\examples\helloworld

Example Folder - Sample application is located in <GigaSpaces.NET Root>\Examples\ProcessingUnit

Features Introduced - Space, GigaSpaces API, Processing Unit, Polling Container, Running inside the IDE.


Before you begin

We recommend that you go through the following steps before you begin this tutorial:

  1. Download GigaSpaces and set up your development environment to work with GigaSpaces - this is needed to run the sample application described in this tutorial.
  2. Step One - Using Processing Units For Scaling - a short, one page introduction to scaling your application, using Processing Units - Recommended.
  1. Download GigaSpaces XAP .NET - this is needed to run the sample application described in this tutorial.
  2. Step One - Using Processing Units For Scaling - a short, one page introduction to scaling your application, using Processing Units - Recommended.

Goals

Create a scalable processor application (a Processing Unit) that processes objects as they are written to the space (data grid). Create a feeder application that feeds objects to the space (data grid) for the processor to read and process.

Contents

  1. The Message class to be fed and processed.
  2. The simple scalable processor application.
  3. The feeder application.
  4. Running the processor inside your IDE.
  5. Running the feeder inside your IDE.
  6. Expected output
  1. The Data class to be fed and processed.
  2. The simple scalable processor application.
  3. The feeder application.
  4. Running the processor and feeder inside your IDE.
  5. Expected output



Components

There are two components in our scenario:

  1. Processor Processing Unit -
    Processes Message objects as they are written to the data grid (Space)

    It contains 3 components:
    a Polling Container component that listens to new Message objects written to the Space, and a Processor Bean that is delegated the actual processing work by the Polling Container.

    The Processing Unit itself runs within a dedicated processing unit container in a host environment. (This can be your IDE, any Java process, or the GigaSpaces Grid Service Container - more on this in the next tutorial.)

    It contains 3 components:
    a Polling Container component that listens to new Message objects written to the Space, and a Processor that is delegated the actual processing work by the Polling Container.

    The Processing Unit itself runs within a dedicated processing unit container in a host environment. (This can be your IDE, any .NET process, or the GigaSpaces Grid Service Container - more on this in the next tutorial.)

2. Feeder - an application that feeds unprocessed Message objects to the Space, and after a certain period of time, counts and reads one of them at random.

blank-line

3. Message Object - a simple POJO with an id and info attributes.

2. Feeder - an application that feeds unprocessed Message objects to the Space.

blank-line

blank-line

3. Message Object - a simple object with an id, processed flag, and info properties.

blank-line

blank-line

The Workflow

  1. The helloFeeder application writes 1000 Message objects (POJOs) to the space and waits.

    blank-line

  2. Inside the Processing Unit, the Polling Container continuously removes unprocessed objects from the data grid (one at a time) and hands them to its Processor Bean for processing.

    blank-line

  3. After each Message object has been processed, the Polling Container writes it back to the Space.
    (Steps 2 & 3 repeat, until there are no more unprocessed Message objects left to process in the Space.)

    blank-line

  4. After waiting 100 milliseconds (to allow for all the objects to be processed), the feeder counts all the processed Message objects inside the Processor Processing Unit's Space, and reads one of them at random.

  1. The Feeder application continuously writes Message objects to the space.

    blank-line

  2. Inside the Processing Unit, the Polling Container continuously removes unprocessed objects from the data grid (one at a time) and hands them to its Processor for processing.

    blank-line

  3. After each Message object has been processed, the Polling Container writes it back to the Space.
    (Steps 2 & 3 repeat, until there are no more unprocessed Message objects left to process in the Space.)

Diagram 1. The application connects to the space, writes, reads and takes a Message Object.


Jump ahead and run the sample application, in case you want to see the final outcome of this tutorial now

Code Walkthrough

The code in this page uses Java JDK 1.5 or higher. If you are using Java JDK 1.4, use the 1.4 version of this page

Code Walkthrough (JDK1.4)

The code in this page uses Java JDK 1.4. If you are using Java JDK 1.5 or higher, use the 1.5 version of this page

First let's take a look at the Message object that is being written to the space by the feeder application:

Code Walkthrough

The Object (Data.cs)

This is a simple object containing three properties: _id, which represents the object id, and _info, which represents the information that this object holds, and processed boolean which indicates if the object has been processed or not. All have setter and getter methods.
Along side a simple constructor.

Listing 1. The Object

/// <summary> Represnts a data object </summary>	
public class Data
{
	#region Members
	private string _info;		
	private Nullable<int> _type;
	private bool _processed;
	#endregion

	#region Properties
	/// <summary> Gets the data type property,
	/// this property will be used as the routing index 
	/// inside the cluster </summary>
	[SpaceRouting]
	public Nullable<int> Type
	{
		get { return _type; }
		set { _type = value; }
	}
	/// <summary> Gets the data info property</summary>
	public string Info
	{
		get { return _info; }
		set { _info = value; }
	}
	/// <summary> Gets or sets the data processed state </summary>
	public bool Processed
	{
		get { return _processed; }
		set { _processed = value; }
	}
	#endregion

	#region Constructors
	/// <summary> Constructs a new data object </summary>
	/// <param name="info">Data's info</param>
	/// <param name="type">Data's type</param>
	public Data(string info, Nullable<int> type)
	{
		_info = info;
		_type = type;
		_processed = false;
	}

	public Data()
	{
	}
	#endregion
}

The Message Object (Message.java)

This is a simple POJO containing two attributes: id, which represents the object id, and info, which represents the information that this object holds. Both have setter and getter methods.

The getter for the id attribute is annotated with the @SpaceRouting annotation that is used to route Message objects when they are written to the space. This is necessary for scaling the application, and will be explained in the next tutorial. For now, just remember that this annotation should decorate one of the object's properties.

private Integer id;    // object id

public void setId(Integer id) {
    this.id = id;
}

@SpaceRouting
public Integer getId() {
    return id;
}

The id attribute is designated as the routing index, and is used to route Message objects when they are written to the space. Marking this attribute as the routing index is done inside the xml mapping file Message.gs.xml located with the Message source file. This is necessary for scaling the application, and will be explained in the next tutorial. For now, just remember that one of the object's properties should be designated for routing.

private Integer id;    // object id

public void setId(Integer id) {
    this.id = id;
}

public Integer getId() {
    return id;
}

private String info;    // info represents the info the object holds

public String getInfo() {
    return info;
}

public void setInfo(String info) {
    this.info = info;
}

A necessary default empty constructor and another constructor to construct
a new Message object with a given id and info:

public Message() {   // Mandatory empty constructor         

}

public Message(Integer id, String info) {
    this.id = id;

    this.info = info;
}

Next, let's take a look at the Processor Processing Unit.

The Processor Processing Unit (DataProcessor.cs, pu.config, Processor.cs)

The Processor Processing Unit contains two components: a space (cache), which holds objects in the runtime process memory, and a processor that takes, modifies and writes objects back to this space.

Processor Processing Unit Configuration (pu.config, DataProcessor.cs)

A Processing Unit always has an configuration file called pu.config, here the space it will interact with is defined,
Let's take a look at this file:
Listing 2. Processor Processing Unit pu.config file

<configuration>
  .
  .
<appSettings>
    <!-- Url to space used by the processing unit, the "/./" prefix states that this space will be embedded
    in the processing unit (will run collocated in its runtime process)-->
    <add key="SpaceUrl" value="/./dataExampleSpace"/>
    <!-- UseTransactions indicates wheter the operations preformed against the space will use transactions-->
    <add key="UseTransactions" value="false"/>
  </appSettings>

</configuration>

The rest of the configugration is defined inside the DataProcessor class:

public class DataProcessor : IProcessingUnitContainer
{
	#region Members

	/// <summary> Proxy to space of this processing unit, will be used to start an embedded space </summary>
	private ISpaceProxy _proxy;

    /// <summary> The event listener container, that will execute the processor logic defined in the Processor.cs </summary>
    private volatile IEventListenerContainer<Data> _eventListenerContainer;

    /// <summary> Cluster info related to this processing unit deployment </summary>
    private ClusterInfo _clusterInfo;

    /// <summary> Custom properties that will be used to init the processing unit, these will
    /// be extracted from the pu.config file, from the <appSettings> section </summary>
    private IDictionary<string, string> _properties;

    #endregion

	#region IProcessingUnit Members

    ///<summary> Initializes the processing unit:
    ///Starts the embedded space, starts a polling container connected to it, that executes the processor logic</summary>
    public void Initialize()
    {
        //Get the space name from the properties (defined in the pu.config file)
        string spaceUrl;
        _properties.TryGetValue(Consts.SpaceUrlProperty, out spaceUrl);
        if (String.IsNullOrEmpty(spaceUrl))
            throw new ArgumentNullException("spaceUrl", "Custom Properties must contain a space name property (" + Consts.SpaceUrlProperty + ")");
        //Gets the use transaction state from the properties (defined in the pu.config file)
        string useTransactionStr;
        _properties.TryGetValue(Consts.UseTransactionsProperty, out useTransactionStr);
        bool useTransactions = (String.IsNullOrEmpty(useTransactionStr) ? false : bool.Parse(useTransactionStr));
            
        //Starts the space as a part of the cluster, the DataProcessor takes data from its embedded space only
        SpaceConfig spaceConfig = new SpaceConfig();
        spaceConfig.ClusterInfo = _clusterInfo;
        ISpaceProxy clusterProxy = SpaceProxyProviderFactory.Instance.FindSpace(spaceUrl, spaceConfig);
        //Gets the direct proxy from the cluster proxy that will be used by this processing unit
        _proxy = clusterProxy.GetServerAdmin().GetDirectProxy();            
            
        //Create the event listener container 
      	_eventListenerContainer = EventListenerContainerFactory.CreateContainer<Data>(_proxy, new ExampleProcessor());
		//If transaction should be used, supply the container with a local transaction manager
		if (useTransactions)
       		_eventListenerContainer.TransactionManager = _proxy.LocalTransactionManager;
		//Start the event listener container
		_eventListenerContainer.Start();

        Console.Write("DataProcessor started, waiting for data that needs processing...");
    }

    ///<summary> Destroys the processing unit, any allocated resources should be cleaned up in this method </summary>
    public void Dispose()
    {
        if (_eventListenerContainer != null)
        {
            _eventListenerContainer.Stop();
            _eventListenerContainer.Dispose();
        }
        //This processing unit created the space, therefor it should also close it
        //by calling Shutdown on the server admin
        if (_proxy != null)
        {
            _proxy.GetServerAdmin().Shutdown();
            //Dispose the space proxy. to clean up any used resources			
            _proxy.Dispose();
        }
    }

    /// <summary> Contains the cluster information for this processing unit instance,
    /// This property will be set at deploy-time by the service grid. </summary>
    public ClusterInfo ClusterInfo
    {
        set { _clusterInfo = value; }
    }
    /// <summary> contain deploy-time context properties and any additional properties passed by the user
    /// This property will be set at deploy time by the service grid. </summary>
    public IDictionary<string, string> Properties
    {
        set { _properties = value; }
    }			

	#endregion
        
}

Next we'll see the source code for the Processor bean.

Processor Bean (Processor.java)

We saw that the pu.xml file defines a bean called Processor. Now let's look at this bean's source code.
It has one method. The primary method, processMessage(), is annotated with the @SpaceDataEvent annotation. Previously we saw that the Processor bean is referenced by the Polling Container and acts as its listener. When a Message object is taken from the space by the Polling Container, this method is invoked with the object as an argument. It returns a processed object. In this example it simply adds the "World !!" string to the object's info property:
/// <summary> This class contain the actuall processing logic,
/// This calass is triggered by the eventContainer inside the DataProcessor, marking it as
/// Polling Event Driven instructs the eventListener to act as a polling container - try
/// reapidtly to take objects matching the property marked as Template - here it is the UnprocessedData
/// property and run the method marked as Data Event Handler on the taken object - here the method is the
/// processData method </summary>

~*Listing 3. Processor.cs

.
    .
    [PollingEventDriven(MinConcurrentConsumers = 1, MaxConcurrentConsumers = 4)]
    internal class Processor
	{
		#region Template
		/// <summary> This property is marked as the template, the eventContainer will try taking
        /// data object with Processed property that holds "false" value </summary>
		[EventTemplate]
    	public Data UnprocessedData
    	{
    		get
			{ 
				Data template = new Data();
				template.Processed = false;
				return template;
			}
		}
		#endregion

		#region Processing methods
		/// <summary> Fake delay that represents processing time </summary>
        
        private const int ProcessingTime = 100;
        /// <summary> This method will be activated by the eventListener on any data object taken,
        /// the returned data object will be written to the space </summary>
        /// <param name="data">Data received</param>
        /// <returns>Processed data</returns>
        [DataEventHandler]
        public Data ProcessData(Data data)
        {
            Console.WriteLine("**** processing - Took element with info: " + data.Info);
            //Process data...			
            Thread.Sleep(ProcessingTime);
            //Set data state to processed
            data.Processed = true;
            Console.WriteLine("**** processing - done");
            return data;
        }
        #endregion
    }
}

Now we are ready to view feeder application that feeds Message objects to the space.

blank-line

The Feeder Application (Feeder.java)

The feeder main method constructs a new Feeder instance, and passes the space URL to it, to connect to the space.
Each space is identified uniquely by its name. In the processor processing unit, we defined the space with the URL "/./processorSpace", which means an embedded space named "processorSpace". Therefore the URL the feeder uses to connect to the space, is "jini:/*/*/processorSpace". Without getting into the details, it means that the Jini protocol is used to discover a space named "processorSpace" in the network. It is passed to the feeder as a program argument.
We then start writing the Message object to the space and then read the results from it.

The Processor Processing Unit (pu.xml, Processor.java)

The Processor Processing Unit contains two components: a space (cache), which holds objects in memory, and a processor bean that takes, modifies and writes objects back to this space.

Processor Processing Unit Configuration (META-INF/spring/pu.xml)

A Processing Unit always has an XML file called pu.xml, that resides under the META-INF\spring directory.
In fact, this is a standard Spring framework XML configuration file, with a number of custom GigaSpaces specific tags. Let's take a look at this file. In our example there are 3 main components contained within the Processing Unit:

  1. The first component is a space (cache) instance embedded inside the Processing Unit, named processorSpace. It has a URL property.
    On the second line, we define a transaction manager, which is referencing this space, and manages its transactions.
    Finally a bean called gigaSpace wraps the space, and provides a simple client API to interact with it, as we will see later in this tutorial.
    <os-core:space id="space" url="/./processorSpace" />
    <os-core:local-tx-manager id="transactionManager" space="space"/>
    <os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/>
  2. The second component is a helloProcessor Bean, which contains the method that does the actual processing. This bean is defined in the Processor.java source file, which is shown in the next section.
    <bean id="helloProcessor" class="org.openspaces.example.helloworld.processor.Processor"/>
  3. $body

    The third, key component in this workflow is the Polling Container, which continuously removes (takes) objects matching certain criteria from the space. The criteria are expressed in the form of a template object (also known as example object). In our case, the polling container is instructed to take objects of type Message. However, it does not take all instances of the Message class, only those whose "info" property equals the string "Hello ". When a match is found, the object is taken and passed to a listener bean - here the listener is the previously defined Processor bean. This bean has a method annotated with the @SpaceDataEvent annotation, which is invoked with the taken object as a parameter. It returns a processed Message object, which is written back to the space by the Polling Container.

    <os-events:polling-container id="helloProcessorPollingEventContainer" giga-space="gigaSpace">
        <os-events:tx-support tx-manager="transactionManager"/>
        <os-core:template>
            <bean class="org.openspaces.example.helloworld.common.Message">
                <property name="info" value="Hello "/>
            </bean>
        </os-core:template>
        <os-events:listener>
            <os-events:annotation-adapter>
                <os-events:delegate ref="helloProcessor"/>
            </os-events:annotation-adapter>
        </os-events:listener>
    </os-events:polling-container>

The third, key component in this workflow is the Polling Container, which continuously removes (takes) objects matching certain criteria from the space. The criteria are expressed in the form of a template object (also known as example object). In our case, the polling container is instructed to take objects of type Message. However, it does not take all instances of the Message class, only those whose "info" property equals the string "Hello ". When a match is found, the object is taken and passed to a listener bean - here the listener is the previously defined Processor bean. This bean has the method processMessage(), which is invoked with the taken object as a parameter. It returns a processed Message object, which is written back to the space by the Polling Container.

<os-events:polling-container id="helloProcessorPollingEventContainer" giga-space="gigaSpace">
    <os-events:tx-support tx-manager="transactionManager"/>
    <os-core:template>
        <bean class="org.openspaces.example.helloworld.common.Message">
            <property name="info" value="Hello "/>
        </bean>
    </os-core:template>
    <os-events:listener>
        <os-events:method-adapter method-name="processMessage">
            <os-events:delegate ref="Processor"/>
        </os-events:method-adapter>
    </os-events:listener>
</os-events:polling-container>

Next we'll see the source code for the Processor bean.

Processor Bean (Processor.java)

We saw that the pu.xml file defines a bean called Processor. Now let's look at this bean's source code.
It has one method. The primary method, processMessage(), is annotated with the @SpaceDataEvent annotation. Previously we saw that the Processor bean is referenced by the Polling Container and acts as its listener. When a Message object is taken from the space by the Polling Container, this method is invoked with the object as an argument. It returns a processed object. In this example it simply adds the "World !!" string to the object's info property:

public class Processor {

    @SpaceDataEvent
    public Message processMessage(Message msg) {
        System.out.println("Processor PROCESSING : " + msg);
        msg.setInfo(msg.getInfo()+"World !!");
        return msg;
    }
    
    public Processor(){
      System.out.println("Processor instantiated...");
    }
}

Now we are ready to view feeder application that feeds Message objects to the space.

blank-line

The Feeder Application (Feeder.java)

The feeder main method constructs a new Feeder instance, and passes the space URL to it, to connect to the space.
Each space is identified uniquely by its name. In the processor processing unit, we defined the space with the URL "/./processorSpace", which means an embedded space named "processorSpace". Therefore the URL the feeder uses to connect to the space, is "jini:/*/*/processorSpace". Without getting into the details, it means that the Jini protocol is used to discover a space named "processorSpace" in the network. It is passed to the feeder as a program argument.
We then start writing the Message object to the space and then read the results from it.

public static void main(String [] args) {
    if(args.length==0){
	System.out.println("Usage: java Feeder <space URL>");
	System.exit(1); 
    }

    Feeder feeder = new Feeder (args[0]);   // create the feeder and connect it to the space
		
    feeder.feed(1000);   // run the feeder (start feeding)

    feeder.readResults();   // read back results
}

Here's the constructor of the Feeder connects to the Processor Processing unit Space by using the input URL:

public Feeder(String url){
     // Connect to a space using the url
     IJSpace space = new UrlSpaceConfigurer(url).space();
                
     // Wrap the space with the gigaSpace API
     this.gigaSpace = new GigaSpaceConfigurer(space).gigaSpace();
}

The feed() method loops and writes Message objects to the space by using the gigaSpace.write() method:

public void feed(int numberOfMessages){
    for(int counter=0;counter<numberOfMessages;counter++){			
	   Message msg = new Message(counter, "Hello ");
	   gigaSpace.write(msg);
    }    
System.out.println("FEEDER WROTE " + numberOfMessages + " messages");
}

public void feed(int numberOfMessages){
	for(int counter=0;counter<numberOfMessages;counter++){
		Message msg = new Message(new Integer(counter), "Hello ");
		gigaSpace.write(msg);
	}
	System.out.println("Feeder WROTE " + numberOfMessages + " messages");
}

Here's how all processed objects are read from the space, using template matching. The number of processed objects in the space (all of them should have their info property set to "Hello World !!") is then printed out:

public void readResults(){

    Message template = new Message();          // Create a template to read a Message with info
    template.setInfo("Hello World !!");        // attribute that equals "Hello World !!"

    // Read an object matching the template
    System.out.println("Here is one of them printed out: "+gigaSpace.read(template));

    //wait 100 millis for all to be processed:
    try{
         Thread.sleep(100);
    }catch(InterruptedException ie){ /*do nothing*/}
		
    // Count number of objects in the space matching the template
    int numInSpace=gigaSpace.count(template);
                
    System.out.println("There are "+numInSpace+" processed Message objects in the space now.");
}

Next, we compile and run the sample application

Compiling and Running the Application within your IDE

Steps to run the application inside Eclipse IDE:

If you haven't already done so, download GigaSpaces and set up your development environment - this is needed for running the tutorial sample application.

Importing the project into Eclipse

  1. Import the hello-common, hello-processor and hello-feeder projects located under the <GigaSpaces Root>/examples/helloworld folder.
    (After importing, you'll see some errors since the GS_HOME path variable is not set yet)
    How do I do that...
  2. Create a new Eclipse environment variable called GS_HOME, and point it to your GigaSpaces installation Root folder
    How do I do that...

Running the Processor

3. From the toolbar at the top of the screen, select Run > Run Dialog... to open the Run dialog
4. Click the + to the left of Java Application, to unfold it
5. Select the Hello Processor launch configuration, and click the Run button

Waiting for the Processor to instantiate

6. Before running the feeder, you should wait for the following output to appear in the Console tab at the bottom of the screen:

Processor instantiated, waiting for messages feed...

This indicates the Processor is up and running

Running the Feeder

7. From the toolbar at the top of the screen, select Run > Run Dialog... to open the Run dialog again
8. Click the + left to Java Application, to unfold it
9. Select the Hello Feeder launch configuration

10. Click the Run button

blank-line

Expected output

Running the processor and the feeder results in the following output, which can be viewed in the Console tab at the bottom of the screen.
Use the Display Selected Console button to switch between the feeder and processor output consoles

Feeder expected output

The feeder starts, writes 100 message objects to the space, reads and prints one of them at random, and finally prints the number of processed messages in the space:

Starting the Feeder (Will wait for the space to initialize first...)
FEEDER WROTE 1000 objects
Here is one of them printed out: id[47] info[Hello World !!]
There are 841 processed Message objects in the space now.
Press any key to continue . . .

blank-line

Processor expected output

The processor prints the id and info attributes for each messages it takes for processing:

Processor PROCESSING : id[445] info[Hello ]
Processor PROCESSING : id[904] info[Hello ]
Processor PROCESSING : id[896] info[Hello ]
Processor PROCESSING : id[446] info[Hello ]
Processor PROCESSING : id[889] info[Hello ]
   .
   .
   .
Processor PROCESSING : id[893] info[Hello ]
Processor PROCESSING : id[905] info[Hello ]
Processor PROCESSING : id[897] info[Hello ]
Processor PROCESSING : id[875] info[Hello ]
Processor PROCESSING : id[900] info[Hello ]

blank-line

What's Next?

Step three - Deploying the Hello World Application onto the Service Grid - shows how to deploy our application onto the grid, to gain fail-over, recovery and self healing capabilities.

Or return to the Quick Start Guide.

GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence