Summary: This tutorial shows how to implement a Master-Worker pattern, using OpenSpaces processing units, in order to perform parallel processing.

Overview

In this tutorial you will use OpenSpaces to implement a simple Order Management System. The tutorial illustrates how JavaSpaces can be used for parallel processing – execution of business logic by a number of autonomous software instances.

To do this, you will use the master-worker pattern, in which a master creates a set of tasks, puts them in a shared space, and waits for the tasks to be picked up and completed by a number of workers. The main advantages of this pattern are that the master and workers operate independently of each other (saving the need for complex coordination), and that the load is automatically balanced between the workers sharing the tasks.

Using JavaSpaces with OpenSpaces

JavaSpaces is especially suitable for implementing a master-worker pattern: the space can serve as a shared work area for numerous masters and workers, which can work together without knowing each other and without requiring central coordination.

However, plain JavaSpaces, as demonstrated in the previous tutorial, is too limited for a real-world master-worker implementation. OpenSpaces offers a powerful yet simple-to-use framework that overcomes the practical limitations of JavaSpaces.

You will learn how to use OpenSpaces to find the space more easily; and to allow each component to work under a local transaction, without requiring a transaction coordinator.

In this tutorial you will construct your applications as simple OpenSpaces Processing Units, which enable the packaging and deployment of application services. In the next tutorials you will learn how to gain the full benefits of the Processing Unit concept implementing Space-Based Architecture (SBA).

The Processing Unit is configured using Spring with deploy/runtime context extensions, such as bean-level properties. This allows you to use the same Processing Unit deployment with no changes, and deploy it within different environments (pre-prod, prod, etc.).

Application Components

You will build a Master-Worker application, in which the Master is a client (or multiple clients) that submits orders to the space, and takes them back from the space after they are processed by the workers.

There are two types of workers:

  • A Validator, which takes orders from the space and either approves or rejects them.
  • A Processor, which takes approved orders and performs a simple mathematical operation on them.



Application Workflow

The basic workflow, as it is represented by Order objects in the space:

  1. One or more clients continuously write new Order objects to the space; each Order is tagged with a client ID.
  2. If one or more Validators are running, they take new orders from the space.
  3. The Validators check the orders, mark them either as approved or rejected, and write them back to the space.
  4. If one or more Processors are running, they take any approved orders (which have gone through a Validator).
  5. The Processors work on the orders, and write back Processed order objects to the space.
  6. Each of the Clients routinely checks to see if there are any Processed orders marked with its unique ID. If so, it takes the results from the space.
  7. Each of the Clients is also registered for notifications received whenever a Rejected order is written to the space, marked with its unique ID. If so, it takes the results from the space.

Notice that in this workflow, no component is dependent on any of the others. Clients write new orders to the space without caring how many Validators or Processors are running and which orders they have processed. They simply watch the space, and take processed orders using a polling mechanism or rejected orders using a notification mechanism when they appear. The same goes for the Validators and Processors – they simply watch the space for new orders to process or validate, and if they find any, perform the required operation and write the result back to the space.

Tutorial Map

Tutorial Sections:

  • Overview – tutorial overview.
  • Tutorial Map – the sections in this page.
  • WorkFlow Animation – 50 second flash animation displaying the workflow.
  • Code – full code review of the entire application.
  • Build And Compile – how to prepare the application for running.
  • Deployment – how to deploy and run the application.
  • Running Within The IDE – how to run the application within Eclipse IDE.
  • Download – full downloadable package with source code and compiling/running scripts.
  • What's next – next tracks, performance boost, durability and scalability with SBA.


WorkFlow Animation

QSG_Track1_Animation.swf


Writing The Code

Domain Model POJO Services and Spring Wiring

The following tabs describe the Domain Model and the POJO service beans in each of the Processing Units, as well as the Processing Units' configuration files, which deifne their structure:

Domain Model

Domain Model

Our OrderEvent object is implemented as a simple POJO shown in the diagram below:

Because our domain model is used throughout the entire application by the different Processing Units, we should define it in a generic place, so we can then include it in each of the Processing Units.

As you can see, the code is pretty straightforward, what's interesting though, are the annotations used in it:

  • @SpaceClass – marks this class as an Entry that is written to the space (and then probably read or taken from the space).
  • @SpaceId(autoGenerate = true) – marks the following method as the generator of the unique key for that object type. In this case, the orderID attribute (that has its getter method annotated) is unique per instance and is automatically generated by the container.
/*
 * Copyright 2007 GigaSpaces Technologies LTD. All rights reserved.
 *
 * THIS SOFTWARE IS PROVIDED "AS IS," WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED INCLUDING BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY AND 
 * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. GIGASPACES WILL NOT 
 * BE LIABLE FOR ANY DAMAGE OR LOSS IN CONNECTION WITH THE SOFTWARE.
 */

package com.gigaspaces.examples.tutorials.parallelprocessing.common;

import com.gigaspaces.annotation.pojo.SpaceClass;
import com.gigaspaces.annotation.pojo.SpaceId;

/**
 * OrderEvent object important properties include the orderID
 * of the object, userName (also used to perform routing when working with partitioned space),
 * and a status indicating if this OrderEvent object in new, processed, or rejected.
 * <p>
 * <code>@SpaceClass</code> annotation in this example is only to indicate that this class is a space class.
 */ 
@SpaceClass
public class OrderEvent {
	
	public static final String STATUS_NEW = "New";	
	public static final String STATUS_APPROVED = "Approved";	
	public static final String STATUS_PROCESSED = "Processed";
	public static final String STATUS_REJECTED = "Rejected";
    
    /**
     * ID of the order.
     */
    private String orderID;
    
    /**
     * ID of the client that placed the order.
     */
    private String clientID;
    
    /**
     * User name of the order.
     */
    private String userName;
    
    /**
     * 	Order status, Possible values: New, Approved, Processed, Rejected
     * */
    private String status;		
    
    /**
     * Constructs a new OrderEvent object. 
     * */
    public OrderEvent() {
    }

    /** 
     * Constructs a new OrderEvent object with the given userName and clientID
     * and operation.
     * @param userName
     * @param clientID
     */
    public OrderEvent(String userName, String clientID) {
        this.userName = userName;
        this.clientID = clientID;
        this.status = STATUS_NEW;
    }

    /** 
     * Gets the ID of the orderEvent.<p>
     * <code>@SpaceID</code> annotation indicates that its value will be auto generated 
     * when it is written to the space. 
     */
    @SpaceId(autoGenerate = true)
    public String getOrderID() {
        return orderID;
    }

    /**
     * Sets the ID of the orderEvent.
     * @param orderID
     */
    public void setOrderID(String orderID) {
        this.orderID = orderID;
    }

    /** 
     * @return userName - Gets the user name of the orderEvent object.
     */
    public String getUserName() {
        return userName;
    }

    /** 
     * @param userName - set the user name of the orderEvent object.
     */
    public void setUserName(String userName) {
        this.userName = userName;
    }
    
    /**
     *	Outputs the orderEvent object attributes.
     */
    public String toString() {
        return "userName[" + userName + "] status[" + status + "]";
    }

	/** 
	 *	@return status - the orderEvent status. 
	 */
	public String getStatus() {
		return status;
	}

	/**
	 *  @param status - Sets the orderEvent status.
	 */
	public void setStatus(String status) {
		this.status = status;
	}

	/** 
     *	@return clientID - Gets the clientID of the orderEvent object.
     */
	public String getClientID() {
		return clientID;
	}

	/**
	 *  @param clientID - Sets the orderEvent clientID.
	 */
	public void setClientID(String clientID) {
		this.clientID = clientID;
	}
}

Choose another tab (back to top)


Client - Feeder Service

Client - Feeder Service Bean

The Client's Feeder service writes OrderEvents to the space that trigger the process of the application. The service implements the InitializingBean interface, which means the afterPropertiesSet method is called when the service is loaded. Additionaly, it implements DisposableBean, which invokes the destroy method when the service is stopped.

The objects are written directly to the space (as defined in the OrderEventFeederTask inner class). The reference to the space proxy is received through injection from the Processing Unit by using the @GigaSpaceContext(name = "gigaSpace") annotation before the declaration of the GigaSpace member. This annotation marks the injection of a GigaSpace object that is defined with a certain name (e.g. gigaSpace) within the Client's pu.xml configuration file.

Feeder service bean source code:

/*
 * Copyright 2008 GigaSpaces Technologies Ltd. All rights reserved.
 *
 * THIS SOFTWARE IS PROVIDED "AS IS," WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED INCLUDING BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY AND 
 * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. GIGASPACES WILL NOT 
 * BE LIABLE FOR ANY DAMAGE OR LOSS IN CONNECTION WITH THE SOFTWARE.
 */

package com.gigaspaces.examples.tutorials.parallelprocessing.client;

import org.openspaces.core.GigaSpace;
import org.openspaces.core.context.GigaSpaceContext;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

import com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent;

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.rmi.server.UID;

/**
 * A feeder bean that starts a scheduled task that writes a new OrderEvent object to the space.
 *
 * <p>The space is injected into this bean using OpenSpaces support for @GigaSpaceContext
 * annotation.
 *
 * <p>The scheduled support uses the java.util.concurrent Scheduled Executor Service. It
 * is started and stopped based on Spring life-cycle events.
 */
public class OrderEventFeeder implements InitializingBean, DisposableBean {

	private Random randomGen = new Random();

    private ScheduledExecutorService executorService;

    //	Delayed result bearing action
    private ScheduledFuture<?> sf;
    
    /**
     * Delay between scheduled tasks
     */
    private long defaultDelay = 1000;
    
    /**
     * The scheduled orderEvent feeding task.
     */ 
    private OrderEventFeederTask orderEventFeederTask;
    
    @GigaSpaceContext(name = "gigaSpace")
    private GigaSpace gigaSpace;
    
    /**
     * Unique ID for this client
     */
    private UID clientID;
    
    /**
     * @param defaultDelay - Sets default delay between feeding tasks.
     */
    public void setDefaultDelay(long defaultDelay) {
        this.defaultDelay = defaultDelay;
    }
    
	/**
	 * The first method run upon bean Initialization when implementing InitializingBean.
	 * Starts a scheduled orderEvent feeding task. 
	 */
	public void afterPropertiesSet() throws Exception {
    	
		//	Create unique ID for this client
		clientID = new UID(); 
		
		System.out.println("CLIENT ["+clientID.toString()+"] Starting feeder with cycle <" + defaultDelay + ">");
        
		//	Create a thread pool containing 1 thread capable of performing scheduled tasks
        executorService = Executors.newScheduledThreadPool(1);
        
        orderEventFeederTask = new OrderEventFeederTask();
        
        //	Schedule the thread to execute the task at fixed rate with the default delay defined 
        sf = executorService.scheduleAtFixedRate(
        										orderEventFeederTask	// The task to schedule
        										,defaultDelay 			// Initial Delay before starting
        										,defaultDelay			// Delay between tasks
        										,TimeUnit.MILLISECONDS	// Time unit for the delay
        										);
    }

    public void destroy() throws Exception {
    	//	Shutting down the thread pool upon bean disposal
    	sf.cancel(true);
        sf = null;
        executorService.shutdown();
    }

    public class OrderEventFeederTask implements Runnable {
    	
    	//	Counts number of fed orderEvents
        private int counter;
                 
        public void run() {
            try {       
            	//	Create a new orderEvent with randomized userName and stamps the clientID
            	OrderEvent orderEvent = new OrderEvent("USER" +randomGen.nextInt(), clientID.toString());
                //	Write the new orderEvent to the space
            	gigaSpace.write(orderEvent);
                System.out.println("CLIENT wrote orderEvent: "+orderEvent);
            } 
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        public int getCounter() {
            return counter;
        }
    }

    public int getFeedCount() {
        return orderEventFeederTask.getCounter();
    }

	public void setClientID(UID clientID) {
		this.clientID = clientID;
	}

	public UID getClientID() {
		return clientID;
	}
}

Choose another tab (back to top)


Client - Counter Service

Client - Counter Service Bean

This service counts and displays information about OrderEvents that are written to the space with different states. When the service is loaded, its internal counters are reset. After this reset, the counters are updated on each event that is received by the service.

In the Client Processing Unit, this service is encapsulated inside two different kinds of event containers as shown in the diagram above. These containers (polling/notify containers) define how and when the Counter service is invoked. The next tab Client - Config shows the differences and how to configure these containers.

Counter service bean source code:

package com.gigaspaces.examples.tutorials.parallelprocessing.client;

import org.openspaces.events.adapter.SpaceDataEvent;

import com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * A simple bean counting and outputting:
 * number of processed and rejected orderEvents taken by the client.
 * 
 * Holds 2 simple counters that are incremented each time a matching event occurs.
 * Outputting the orderEvent updated status.
 */
public class OrderEventCounterDisplayer {
	
    private AtomicInteger orderEventProcessedCounter = new AtomicInteger(0);
    private AtomicInteger orderEventRejectedCounter = new AtomicInteger(0);
        
    @SpaceDataEvent
    public void outputInfo(OrderEvent orderEvent) {
        
        if (orderEvent.getStatus().equals(OrderEvent.STATUS_PROCESSED)){
        	orderEventProcessedCounter.incrementAndGet();
        }
        else {
        	if (orderEvent.getStatus().equals(OrderEvent.STATUS_REJECTED)){
            	orderEventRejectedCounter.incrementAndGet();
        	}
        }
        System.out.println("CLIENT took "+orderEvent.getStatus()+ 
				" OrderEvent , Total rejected taken ["+orderEventRejectedCounter+"], Total processed taken ["
				+orderEventProcessedCounter+"]");
    }  
}

Choose another tab (back to top)


Client - Config

Client - XML Configuration File

By now, the implementation of the Client services should be finished, but the process isn't yet complete. The space configuration and the types of events the services should handle aren't defined in the code. Instead, we define all of these in the configuration file (pu.xml) of each Processing Unit.

The polling container and notify container are defined in the Client's pu.xml file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:os-core="http://www.openspaces.org/schema/core"
       xmlns:os-events="http://www.openspaces.org/schema/events"
       xmlns:os-remoting="http://www.openspaces.org/schema/remoting"
       xmlns:os-sla="http://www.openspaces.org/schema/sla"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd
       http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd
       http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd
       http://www.openspaces.org/schema/sla http://www.openspaces.org/schema/sla/openspaces-sla.xsd">
    
<!-- ================================================================================================================== -->
	
    <!-- Enables the usage of @GigaSpaceContext annotation based injection. -->
    <os-core:giga-space-context/>
	
    <!-- A bean representing a space (an IJSpace implementation).
         Note, we perform a lookup on the space since we are working against a remote space. -->
    <os-core:space id="space" url="jini://*/*/spacePP"/>

    <!-- OpenSpaces simplified space API built on top of IJSpace/JavaSpace. -->
    <os-core:giga-space id="gigaSpace" space="space"/>
	
	<!-- Defines a local Jini transaction manager. -->
    <os-core:local-tx-manager id="transactionManager" space="space"/>
	
    <!-- The Data feeder bean, writing new OrderEvents objects to the space in a constant interval. -->
    <bean id="orderEventFeeder" class="com.gigaspaces.examples.tutorials.parallelprocessing.client.OrderEventFeeder"/>
	
	<!-- This bean outputs the orderEvent object -->
    <bean id="outputOrderEvent" class="com.gigaspaces.examples.tutorials.parallelprocessing.client.OrderEventCounterDisplayer"/>
    
<!-- ================================================================================================================== -->
    
    <!-- The notification container, registers for notification on every orderEvent write (notify 
    	 on write is default) that satisfies the template (in this case with status="Approved").
    	 Upon notification invokes the outputOrderEvent listner on a copy of the object that 
    	 triggered the event.
    	 The notify event container is also configured to automatically perform a take on the
    	 notification data event, and configured to filter out events if the take operation returned null 
    	 (this usually happens when several clients receive this event, and only one succeeds with the take) 
    	 -->
	<os-events:notify-container id="orderEventNotifyContainer" giga-space="gigaSpace" perform-take-on-notify="true" ignore-event-on-null-take="true">
		<os-events:tx-support tx-manager="transactionManager"/>
		<os-core:template>
	        <bean class="com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent">
	        	<property name="status" value="Rejected"/>
	        </bean>
	    </os-core:template>
	    <os-events:listener>
	        <os-events:annotation-adapter>
	            <os-events:delegate ref="outputOrderEvent"/>
	        </os-events:annotation-adapter>
	    </os-events:listener>
	</os-events:notify-container>
	
	<!-- A polling event container that performs (by default) polling take operations against
         the space using the provided template (in this case, the processed orderEvent objects).
         Once a match is found, the outputOrderEvent bean event listener is triggered using the
         annotation adapter, the listener method is annotated inside the bean with the @SpaceDataEvent
         annotation. -->
    <os-events:polling-container id="orderEventPollingEventContainer" giga-space="gigaSpace">
        <os-events:tx-support tx-manager="transactionManager"/>
        <os-core:template>
            <bean class="com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent">
                <property name="status" value="Processed"/>
            </bean>
        </os-core:template>
        <os-events:listener>
            <os-events:annotation-adapter>
                <os-events:delegate ref="outputOrderEvent"/>
            </os-events:annotation-adapter>
        </os-events:listener>
    </os-events:polling-container>
	
</beans>

Choose another tab (back to top)


Validator - Validator Service

Validator - Validator Service Bean

The OrderEventValidator takes OrderEvents with status New from the space, and sets their status randomly to Approved or Rejected, simulating a validation process. When you follow the code, you can see that there is no indication of the type of OrderEvents handled by the Validator. All this logic is defined in the Validator's pu.xml file, as well as the type of operation the Validator performs on the space (by default, a take operation of a single object that matches the defined template), and at which interval.

The service implementation does not need to implement any specific interface or to have any special code. The only thing necessary is the use of the @SpaceDataEvent annotation, which marks the method that handles the event (in this case, the event is a take of an OrderEvent from the space). Note that the same event-handling method (validatesOrderEvent in our case) has a return value. This returned object is written back to the space by the container.

Validator service bean source code:

package com.gigaspaces.examples.tutorials.parallelprocessing.validator;

import org.openspaces.events.adapter.SpaceDataEvent;

import com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent;

/**
 * Simple bean used to validate the orderEvent objects.
 */
public class OrderEventValidator {

    private long workDuration = 100;
    
	/**
     * Sets the simulated work duration (in milliseconds). Default to 100.
     */
    public void setWorkDuration(long workDuration) {
        this.workDuration = workDuration;
    }

    /**
     * Validate the given OrderEvent object and return the validated OrderEvent with
     * status field set to Approved/Rejected according to the validation.
     *
     * Can be invoked using OpenSpaces Events when a matching event
     * occurs or using OpenSpaces remoting.
     */
    @SpaceDataEvent	//	This annotation marks the method as the event listener.
    public OrderEvent validatesOrderEvent(OrderEvent orderEvent) {
            	    	
    	// sleep to simulate some work
        try {
            Thread.sleep(workDuration);
        } catch (InterruptedException e) {
            // do nothing
        }
     
        System.out.println("VALIDATOR validating orderEvent: "+orderEvent);
        
        //	For the sake of the example the status is arbitrarily randomized.
		if ( Math.random() >= 0.5)
		{
			orderEvent.setStatus(OrderEvent.STATUS_APPROVED);
		}
		else 
		{
			orderEvent.setStatus(OrderEvent.STATUS_REJECTED);
		}       
        
		System.out.println("VALIDATOR set orderEvent status to: "+orderEvent.getStatus());
		
        //  orderID is declared as primary key and as auto-generated. 
    	//	It must be null before writing an operation.
    	orderEvent.setOrderID(null);
    	
        return orderEvent;
    }
}

Choose another tab (back to top)


Validator - Config

Validator - XML Configuration file

Just like in the Client Processing Unit, the space configuration and the types of events the Validator services should handle are defined in the Validator Processing Unit configuration file (pu.xml):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:os-core="http://www.openspaces.org/schema/core"
       xmlns:os-events="http://www.openspaces.org/schema/events"
       xmlns:os-remoting="http://www.openspaces.org/schema/remoting"
       xmlns:os-sla="http://www.openspaces.org/schema/sla"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd
       http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd
       http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd
       http://www.openspaces.org/schema/sla http://www.openspaces.org/schema/sla/openspaces-sla.xsd">

<!-- ============================================================================================= -->

    <!-- Enables the usage of @GigaSpaceContext annotation based injection. -->
    <os-core:giga-space-context/>

    <!-- A bean representing a space (an IJSpace implementation).
		 Note, we do not specify here the cluster topology of the space. It is declared outside of
         the processing unit or within the SLA bean. -->
    <os-core:space id="space" url="jini://*/*/spacePP"/>

    <!-- OpenSpaces simplified space API built on top of IJSpace/JavaSpace. -->
    <os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/>

    <!-- Defines a local Jini transaction manager. -->
    <os-core:local-tx-manager id="transactionManager" space="space"/>
	
	<!-- The orderEvent validator bean -->
    <bean id="orderEventValidator" class="com.gigaspaces.examples.tutorials.parallelprocessing.validator.OrderEventValidator"/>

<!-- ========================================================================================================================= -->

	<!-- A polling event container that performs (by default) polling take operations against
         the space using the provided template (in our case, the new orderEvents objects).
         Once a match is found, the orderEvent validator bean event listener is triggered using the
         annotation adapter, the listener method is annotated inside the bean with the @SpaceDataEvent
         annotation. -->
    <os-events:polling-container id="orderEventValidatorPollingEventContainer" giga-space="gigaSpace">
        <os-events:tx-support tx-manager="transactionManager"/>
        <os-core:template>
            <bean class="com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent">
                <property name="status" value="New"/>
            </bean>
        </os-core:template>
        <os-events:listener>
            <os-events:annotation-adapter>
                <os-events:delegate ref="orderEventValidator"/>
            </os-events:annotation-adapter>
        </os-events:listener>
    </os-events:polling-container>
   
</beans>

Choose another tab (back to top)


Processor - Processor Service

Processor - Processor Service

Very similiar to how we implemented the OrderEventValidator, the OrderEventProcessor has a method that is annotated with @SpaceDataEvent to mark it as the handler of events. Again, the events are taken as OrderEvents from the space, this time only orders with status Approved that were validated by the Validator.

/*
 * Copyright 2008 GigaSpaces Technologies LTD. All rights reserved.
 *
 * THIS SOFTWARE IS PROVIDED "AS IS," WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED INCLUDING BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY AND 
 * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. GIGASPACES WILL NOT 
 * BE LIABLE FOR ANY DAMAGE OR LOSS IN CONNECTION WITH THE SOFTWARE.
 */

package com.gigaspaces.examples.tutorials.parallelprocessing.processor;

import org.openspaces.events.adapter.SpaceDataEvent;

import com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent;

/**
 * The processor uses OpenSpaces support for annotation mark-up allowing to use @SpaceDataEvent to
 * mark a method as an event listener. Note, processOrderEvent does not use any space
 * API on the OrderEvent object (though it can), receiving the OrderEvent object to be processed 
 * and returning the result that will be automatically written back to the space.
 * 
 * <p>You can set the simulated work done when processOrderEvent is called by setting the work 
 * duration (defaults to 100 ms).
 * 
 * <p>Note, changing the event container is just a matter of xml configuration (for example,
 * switching from polling container to notify container) and does not affect this class.
 */
public class OrderEventProcessor {

    private long workDuration = 100;
    
    /** 
     * Sets the simulated work duration (in milliseconds). Default to 100. 
     **/
    public void setWorkDuration(long workDuration) {
        this.workDuration = workDuration;
    }

    /**  
     * Process the given OrderEvent object and returns the processed OrderEvent.
     * <p>
     * Annotated @SpaceDataEvent - Marks this method to be executed when an event occurs. 
     **/
    @SpaceDataEvent
    public OrderEvent processOrderEvent(OrderEvent orderEvent) {
        
    	// sleep to simulate some work
        try {
            Thread.sleep(workDuration);
        } catch (InterruptedException e) {
            // do nothing
        }
        
        System.out.println("PROCESSOR Processing orderEvent: "+orderEvent);
        
        //	In this simple example the only processing work will be changing the orederEvent status.
        orderEvent.setStatus(OrderEvent.STATUS_PROCESSED);

        //  orderID is declared as primary key and as auto-generated. 
    	//	It must be null before writing an operation.
    	orderEvent.setOrderID(null);       
        
    	return orderEvent;
    }
}

Choose another tab (back to top)


Processor - Config

Processor - XML Configuration File

Again, like in the Client and Validator Processing Units, the space configuration and the types of events the processor services should handle are defined in the configuration file (pu.xml):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:os-core="http://www.openspaces.org/schema/core"
       xmlns:os-events="http://www.openspaces.org/schema/events"
       xmlns:os-remoting="http://www.openspaces.org/schema/remoting"
       xmlns:os-sla="http://www.openspaces.org/schema/sla"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd
       http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd
       http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd
       http://www.openspaces.org/schema/sla http://www.openspaces.org/schema/sla/openspaces-sla.xsd">

<!-- ============================================================================================================ -->

    <!-- Enables the usage of @GigaSpaceContext annotation based injection. -->
    <os-core:giga-space-context/>

    <!-- A bean representing a space (an IJSpace implementation).
		 Note, we do not specify here the cluster topology of the space. It is declared outside of
         the processing unit or within the SLA bean. -->
    <os-core:space id="space" url="jini://*/*/spacePP"/>

    <!-- OpenSpaces simplified space API built on top of IJSpace/JavaSpace. -->
    <os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/>

    <!-- Defines a local Jini transaction manager. -->
    <os-core:local-tx-manager id="transactionManager" space="space"/>
	
    <!-- The orderEvent processor bean -->
    <bean id="orderEventProcessor" class="com.gigaspaces.examples.tutorials.parallelprocessing.processor.OrderEventProcessor"/>
	
<!-- ============================================================================================================ -->
	
    <!-- A polling event container that performs (by default) polling take operations with txn 
    	 support against the space using the provided template (in our case, the approved orderEvent objects).
         Once a match is found, the data processor bean event listener is triggered using the
         annotation adapter (the method anotated @SpaceDataEvent inside the class is invoked). -->
    <os-events:polling-container id="orderEventProcessorPollingEventContainer" giga-space="gigaSpace">
        <os-events:tx-support tx-manager="transactionManager"/>
        <os-core:template>
            <bean class="com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent">
                <property name="status" value="Approved"/>
            </bean>
        </os-core:template>
        <os-events:listener>
            <os-events:annotation-adapter>
                <os-events:delegate ref="orderEventProcessor"/>
            </os-events:annotation-adapter>
        </os-events:listener>
    </os-events:polling-container>

</beans>

Choose another tab (back to top)

Build And Compile

Once everything is ready, we need to build, package and deploy the application.

The following JAR files need to be in your classpath, all of which reside in the <GigaSpaces Root>\lib directory or its subfolders:

  • JSpaces.jar
  • openspaces/openspaces.jar
  • spring/spring.jar
  • jini/jsk-platform.jar
  • jini/jsk-lib.jar

In order to deploy the application, we need to deploy each of the three Processing Units separately. To do this, every Processing Unit must be placed in the <GigaSpaces Root>\deploy directory. Every Processing Unit is actually a folder (whose name is the name of the Processing Unit later used for deployment) with several subfolders. Here's a typical Processing Unit directory as it resides under the <GigaSpaces Root>\deploy directory:

As the image shows, under the deploy folder of the product, we've put the pp-oms-client folder, which is in fact the application's Client Processing Unit. The folder name states the name of the Processing Unit.

Under META-INF\spring is our pu.xml file, relevant for the feeder Processing Unit. Also, directly under the deploy folder is our compiled code, with its appropriate package structure (the com folder in the image above).

Finally, the shared-lib folder includes all libraries that are used globally by this Processing Unit and potentially other Processing Units. In this case, shared-lib holds a JAR file that includes our domain model object. The same JAR is placed under the shared-lib folder of each of the Processing Units in our application.

To summarize the building and packaging process:

  1. Compile your code.
  2. Create a common JAR file for your shared libraries.
  3. Create a folder of the Processing Unit that complies with the structure shown in the image, and copy all the necessary files into it accordingly. Give the folder any name you want for this Processing Unit.
  4. Copy the created folder into the <GigaSpaces Root>\deploy directory.
  5. Repeat for every Processing Unit in the application.

The full example package also includes two prepared automated ways to build and compile the application:

Using supplied batch scripts for Windows/Unix:

  1. Run <Example Folder>\bin\compile.bat/.sh – compiles the modules and arranges their structure for deployment.
  2. Run <Example Folder>\bin\copy_to_deploy_folder.bat/.sh – copies the ready-to-deploy modules to the <GigaSpaces Root>\deploy folder.

Using supplied build.xml file and ant:

  1. Running build compiles all the different modules. In the case of the Client, Validator and Processor modules, build compiles the classes directly into their respective Processing Unit structure.
  2. Running dist finalizes the structure of all Processing Units by copying the common module JAR into the shared-lib directory in the Processing Unit structure.
    For example, in the client module, dist copies the JAR to client/pu/pp-oms-client/shared-lib, and makes client/pu/pp-oms-client a ready-to-use Processing Unit.
  3. Running copy-local-client, copy-local-validator and copy-local-processor copies the finalized Processing Units to the <GigaSpaces Root>\deploy folder, ready to be deployed into the Service Grid.

Deployment

You are now ready to deploy your application, but first, a short recap:

Our Order Management System application has 3 Processing Units: Client, Validator and Processor, and an Enterprise Data Grid that contains a single space instance. The domain model includes the Order object.

There are several ways to deploy the application and to run a Processing Unit. A Processing Unit can either run in standalone mode in your IDE (for development and testing purposes), or on top of the Service Grid, in SLA-driven containers that are called GSCs (Grid Service Containers). In this section, we'll show the latter approach, which is used in production environments.

Because we want to deploy to the Service Grid, we first need to start it. Running the Service Grid is as easy as running one GSM (Grid Service Manager) from <GigaSpaces Root\bin, and several GSCs, on top of which we run our Processing Units. The deployed application looks like this:



Though the image above shows the Client Processing Unit and the space in one container and the Validator and Processor Processing Units in a second container; they might be arranged differently, according to the deployment order. For example, the Validator and Client might be in one SLA container, while the other encapsulates the Processor and the space. We can run more SLA containers (GSCs) and have each component inside its own SLA container.

Starting Service Grid Components

  1. To start the GSM, execute:
    <GigaSpaces Root>\bin\gsm.(sh/bat)
  2. Run 4 GSCs by executing the following 4 times:
    <GigaSpaces Root>\bin\gsc.(sh/bat)
  3. Start the GigaSpaces Management Center:
    <GigaSpaces Root>\bin\gs-ui.(sh/bat)
  4. Click the left Deployments tab.



  5. The two running GSCs are displayed in the Details area. Both of them are still empty, because no Processing Units or space instances have been deployed.

    EDG Space Instance Deployment

  6. After selecting the Deployments tab, click the Deploy Application button ( ) to open the Deployment Wizard.
  7. In the Deployment Wizard, select the Enterprise Data Grid option and click Next:



  8. Fill the Data Grid Name attribute with spacePP and choose blank for the Cluster schema attribute as shown below:



  9. Click Deploy to deploy the EDG, the following screen should appear:



  10. After the deployment is successful, click Close.



  11. Now, have another look at the lower side of the Deployments tab. You should see that one of the GSCs contains the EDG space instances you just deployed.



Deploying the Processing Units

  1. To deploy the Processing Unit, click the deploy button ( ) – the Deployment Wizard opens:



  2. In the first page of the wizard (screen above), click the SBA Application - Processing Unit radio button, and click Next to see the deployment dialog:



  3. In the Processing Unit name field, select the name of the Processing Unit. This name is the same as the name of the Processing Unit directory, located under the <GigaSpaces Root>\deploy directory.

    For example, if you copied the Client Processing Unit folder under the deploy directory with the name pp-oms-client, select pp-oms-client in the Processing Unit Name field.
    (When using the scripts supplied in the example, the Processing units' folders are named pp-oms-client, pp-oms-validator and pp-oms-processor).



  4. Click Deploy. the following screen should appear showing the deployment status:



  5. Wait until the Processing Unit successfully finishes deploying, then click Close.
  6. Now, have another look at the lower side of the Deployments tab. You should see that one of the GSCs contains the Processing Unit you just deployed.



  7. Repeat the deployment process twice more for the other Processing Units (remember, you are deploying the Client, Validator and Processor Processing Units).
    At the bottom of the Deployments tab, you should now see:



Expected Output

  1. In the GSC command windows, you can see something like the following as your output lines.
    • For the Client:
      .
      .
      CLIENT wrote orderEvent: userName[USER678267834] status[New]
      .
      .
      CLIENT took Processed OrderEvent , Total rejected taken [772], Total processed taken [765]
      .
      .
      CLIENT took Rejected OrderEvent , Total rejected taken [773], Total processed taken [765]
      .
      .
      
    • For the Validator:
      .
      .
      VALIDATOR validating orderEvent:userName[USER1761393384] status[New]
      VALIDATOR set orderEvent status to: Approved
      .
      .
      VALIDATOR validating orderEvent:userName[USER678267834] status[New]
      VALIDATOR set orderEvent status to: Rejected
      .
      .
      
    • For the Prcessor:
      .
      .
      PROCESSOR: Processing orderEvent:userName[USER1761393384] status[Approved]
      .
      .
      

Running Within Eclipse IDE

You can execute the Processing Units inside your IDE. This enables you to easily modify and test your applications. To do this, we use the OpenSpaces Integrated Processing Unit Container:

  1. Deploy an EDG space instance named spacePP, see instructions.

    In the next tutorials, when we learn how to use the Processing Unit concept to implement SBA (Space-Based Architecture), we use Processing Units running embedded spaces, instead of deploying an external space instance.

  2. If you haven't done so yet, import your project to eclipse (includes defining an Eclipse workspace variable called GS_HOME, pointing to your <GigaSpaces Root> directory, since all the project JARs are referenced with this variable).
  3. Choose one of the 3 Processing Unit projects: client, validator, or processor from the package navigator:



  4. Click Run > Open Run Dialog to open the Create, manage, and run configurations dialog.
  5. Create New launch configuration for Java Application:



  6. Name the new configuration and type org.openspaces.pu.container.integrated.IntegratedProcessingUnitContainer in the Main class field:



  7. Click Run to run the chosen Processing Unit inside the integrated container.
  8. Repeat these steps for the other two Processing Units.

How this Works

The org.openspaces.pu.container.integrated.IntegratedProcessingUnitContainer class (available inside the OpenSpaces JAR lib folder) contains a main method that when executed, searches the project for the directory META-INF\spring that contains the Processing Unit configuration file – pu.xml, and then loads the Processing Unit.

Download Full Example Package


The full Java tutorial example package including execution scripts:

Download the zip file

Extract this directory into your <GigaSpaces Root> directory.
After unzipping find the package for this tutorial at <GigaSpaces Root>\examples\Tutorials\Parallel Processing\PP_OrderManagement.

How to import the project to Eclipse IDE after downloading

Full Package Directory Structure

After downloading and extracting, your example folder should look something like this:

PP_Order_Management – tutorial folder
client – Client Processing Unit project
src
com\gigaspaces\examples\tutorials\parallelprocessing\client – Client service beans Java source files.
OrderEventFeeder.java – POJO service bean that feeds orderEvent objects to the space
OrderEventCounterDisplayer.java – POJO service bean that counts orderEvents and displays their information
META-INF\spring
pu.xml – Client Processing Unit configuration XML file
validator – Validator Processing Unit project
src
com\gigaspaces\examples\tutorials\parallelprocessing\validator – Validator service beans Java source files
OrderEventValidator.java – POJO service bean that validates the orderEvents
META-INF\spring
pu.xml – Validator Processing Unit configuration XML file
processor – Processor Processing Unit project
src
com\gigaspaces\examples\tutorials\omsgs\processor – Processor service beans Java source files
OrderEventProcessor.java – POJO service bean that processes the orderEvents
META-INF\spring
pu.xml – Processor Processing Unit configuration XML file
common – domain model used by all the Processing Units
src\com\gigaspaces\examples\tutorials\omsgs\common
OrderEvent.java
docs – expected output files for the 3 Processing Units
expected_output_pp-oms-client.txt
expected_output_pp-oms-validator.txt
expected_output_pp-oms-processor.txt
bin - build and compile scripts.
compile.bat/.sh – a script that compilies and builds the 3 Processing Units (Windows/Unix)
copy_to_deploy_folder.bat/.sh – a script that copies the compiled Processing Units to the GigaSpaces deploy folder
setExampleEnv.bat/.sh – sets the environment variables for the example scripts
build.xml – XML file used with Ant to compile/build/prepare for deployment
readme.txt

Importing Project to Eclipse IDE

  1. If you haven't done so yet, first download GigaSpaces XAP and the example package.
  2. Start Eclipse, using import existing project. Import the 4 sub-projects:
    parallelprocessing-client, parallelprocessing-validator, parallelprocessing-processor and parallelprocessing-common
    from <GigaSpaces Root>\examples\tutorials\Parallel_Processing\PP_OrderManagment to your workspace.
  3. Add a variable to the Eclipse workspace named GS_HOME pointing to your <GigaSpaces Root> directory.


What's Next?

Try Another Tutorial
GigaSpaces XAP Help Portal

GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence