Domain Model POJO Services and Spring WiringThe following tabs describe the Domain Model and the POJO service beans in each of the Processing Units, as well as the Processing Units' configuration files, which deifne their structure:
Domain Model
Domain ModelOur OrderEvent object is implemented as a simple POJO shown in the diagram below: Because our domain model is used throughout the entire application by the different Processing Units, we should define it in a generic place, so we can then include it in each of the Processing Units. As you can see, the code is pretty straightforward, what's interesting though, are the annotations used in it:
/* * Copyright 2007 GigaSpaces Technologies LTD. All rights reserved. * * THIS SOFTWARE IS PROVIDED "AS IS," WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED INCLUDING BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. GIGASPACES WILL NOT * BE LIABLE FOR ANY DAMAGE OR LOSS IN CONNECTION WITH THE SOFTWARE. */ package com.gigaspaces.examples.tutorials.parallelprocessing.common; import com.gigaspaces.annotation.pojo.SpaceClass; import com.gigaspaces.annotation.pojo.SpaceId; /** * OrderEvent object important properties include the orderID * of the object, userName (also used to perform routing when working with partitioned space), * and a status indicating if this OrderEvent object in new, processed, or rejected. * <p> * <code>@SpaceClass</code> annotation in this example is only to indicate that this class is a space class. */ @SpaceClass public class OrderEvent { public static final String STATUS_NEW = "New"; public static final String STATUS_APPROVED = "Approved"; public static final String STATUS_PROCESSED = "Processed"; public static final String STATUS_REJECTED = "Rejected"; /** * ID of the order. */ private String orderID; /** * ID of the client that placed the order. */ private String clientID; /** * User name of the order. */ private String userName; /** * Order status, Possible values: New, Approved, Processed, Rejected * */ private String status; /** * Constructs a new OrderEvent object. * */ public OrderEvent() { } /** * Constructs a new OrderEvent object with the given userName and clientID * and operation. * @param userName * @param clientID */ public OrderEvent(String userName, String clientID) { this.userName = userName; this.clientID = clientID; this.status = STATUS_NEW; } /** * Gets the ID of the orderEvent.<p> * <code>@SpaceID</code> annotation indicates that its value will be auto generated * when it is written to the space. */ @SpaceId(autoGenerate = true) public String getOrderID() { return orderID; } /** * Sets the ID of the orderEvent. * @param orderID */ public void setOrderID(String orderID) { this.orderID = orderID; } /** * @return userName - Gets the user name of the orderEvent object. */ public String getUserName() { return userName; } /** * @param userName - set the user name of the orderEvent object. */ public void setUserName(String userName) { this.userName = userName; } /** * Outputs the orderEvent object attributes. */ public String toString() { return "userName[" + userName + "] status[" + status + "]"; } /** * @return status - the orderEvent status. */ public String getStatus() { return status; } /** * @param status - Sets the orderEvent status. */ public void setStatus(String status) { this.status = status; } /** * @return clientID - Gets the clientID of the orderEvent object. */ public String getClientID() { return clientID; } /** * @param clientID - Sets the orderEvent clientID. */ public void setClientID(String clientID) { this.clientID = clientID; } } Choose another tab (back to top) Client - Feeder Service Client - Feeder Service Bean![]() The Client's Feeder service writes OrderEvents to the space that trigger the process of the application. The service implements the InitializingBean interface, which means the afterPropertiesSet method is called when the service is loaded. Additionaly, it implements DisposableBean, which invokes the destroy method when the service is stopped. The objects are written directly to the space (as defined in the OrderEventFeederTask inner class). The reference to the space proxy is received through injection from the Processing Unit by using the @GigaSpaceContext(name = "gigaSpace") annotation before the declaration of the GigaSpace member. This annotation marks the injection of a GigaSpace object that is defined with a certain name (e.g. gigaSpace) within the Client's pu.xml configuration file. Feeder service bean source code: /* * Copyright 2008 GigaSpaces Technologies Ltd. All rights reserved. * * THIS SOFTWARE IS PROVIDED "AS IS," WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED INCLUDING BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. GIGASPACES WILL NOT * BE LIABLE FOR ANY DAMAGE OR LOSS IN CONNECTION WITH THE SOFTWARE. */ package com.gigaspaces.examples.tutorials.parallelprocessing.client; import org.openspaces.core.GigaSpace; import org.openspaces.core.context.GigaSpaceContext; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.rmi.server.UID; /** * A feeder bean that starts a scheduled task that writes a new OrderEvent object to the space. * * <p>The space is injected into this bean using OpenSpaces support for @GigaSpaceContext * annotation. * * <p>The scheduled support uses the java.util.concurrent Scheduled Executor Service. It * is started and stopped based on Spring life-cycle events. */ public class OrderEventFeeder implements InitializingBean, DisposableBean { private Random randomGen = new Random(); private ScheduledExecutorService executorService; // Delayed result bearing action private ScheduledFuture<?> sf; /** * Delay between scheduled tasks */ private long defaultDelay = 1000; /** * The scheduled orderEvent feeding task. */ private OrderEventFeederTask orderEventFeederTask; @GigaSpaceContext(name = "gigaSpace") private GigaSpace gigaSpace; /** * Unique ID for this client */ private UID clientID; /** * @param defaultDelay - Sets default delay between feeding tasks. */ public void setDefaultDelay(long defaultDelay) { this.defaultDelay = defaultDelay; } /** * The first method run upon bean Initialization when implementing InitializingBean. * Starts a scheduled orderEvent feeding task. */ public void afterPropertiesSet() throws Exception { // Create unique ID for this client clientID = new UID(); System.out.println("CLIENT ["+clientID.toString()+"] Starting feeder with cycle <" + defaultDelay + ">"); // Create a thread pool containing 1 thread capable of performing scheduled tasks executorService = Executors.newScheduledThreadPool(1); orderEventFeederTask = new OrderEventFeederTask(); // Schedule the thread to execute the task at fixed rate with the default delay defined sf = executorService.scheduleAtFixedRate( orderEventFeederTask // The task to schedule ,defaultDelay // Initial Delay before starting ,defaultDelay // Delay between tasks ,TimeUnit.MILLISECONDS // Time unit for the delay ); } public void destroy() throws Exception { // Shutting down the thread pool upon bean disposal sf.cancel(true); sf = null; executorService.shutdown(); } public class OrderEventFeederTask implements Runnable { // Counts number of fed orderEvents private int counter; public void run() { try { // Create a new orderEvent with randomized userName and stamps the clientID OrderEvent orderEvent = new OrderEvent("USER" +randomGen.nextInt(), clientID.toString()); // Write the new orderEvent to the space gigaSpace.write(orderEvent); System.out.println("CLIENT wrote orderEvent: "+orderEvent); } catch (Exception e) { e.printStackTrace(); } } public int getCounter() { return counter; } } public int getFeedCount() { return orderEventFeederTask.getCounter(); } public void setClientID(UID clientID) { this.clientID = clientID; } public UID getClientID() { return clientID; } } Choose another tab (back to top) Client - Counter Service Client - Counter Service Bean![]() This service counts and displays information about OrderEvents that are written to the space with different states. When the service is loaded, its internal counters are reset. After this reset, the counters are updated on each event that is received by the service. In the Client Processing Unit, this service is encapsulated inside two different kinds of event containers as shown in the diagram above. These containers (polling/notify containers) define how and when the Counter service is invoked. The next tab Client - Config shows the differences and how to configure these containers. Counter service bean source code: package com.gigaspaces.examples.tutorials.parallelprocessing.client; import org.openspaces.events.adapter.SpaceDataEvent; import com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent; import java.util.concurrent.atomic.AtomicInteger; /** * A simple bean counting and outputting: * number of processed and rejected orderEvents taken by the client. * * Holds 2 simple counters that are incremented each time a matching event occurs. * Outputting the orderEvent updated status. */ public class OrderEventCounterDisplayer { private AtomicInteger orderEventProcessedCounter = new AtomicInteger(0); private AtomicInteger orderEventRejectedCounter = new AtomicInteger(0); @SpaceDataEvent public void outputInfo(OrderEvent orderEvent) { if (orderEvent.getStatus().equals(OrderEvent.STATUS_PROCESSED)){ orderEventProcessedCounter.incrementAndGet(); } else { if (orderEvent.getStatus().equals(OrderEvent.STATUS_REJECTED)){ orderEventRejectedCounter.incrementAndGet(); } } System.out.println("CLIENT took "+orderEvent.getStatus()+ " OrderEvent , Total rejected taken ["+orderEventRejectedCounter+"], Total processed taken [" +orderEventProcessedCounter+"]"); } } Choose another tab (back to top) Client - Config Client - XML Configuration File![]() By now, the implementation of the Client services should be finished, but the process isn't yet complete. The space configuration and the types of events the services should handle aren't defined in the code. Instead, we define all of these in the configuration file (pu.xml) of each Processing Unit. The polling container and notify container are defined in the Client's pu.xml file: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:os-core="http://www.openspaces.org/schema/core" xmlns:os-events="http://www.openspaces.org/schema/events" xmlns:os-remoting="http://www.openspaces.org/schema/remoting" xmlns:os-sla="http://www.openspaces.org/schema/sla" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd http://www.openspaces.org/schema/sla http://www.openspaces.org/schema/sla/openspaces-sla.xsd"> <!-- ================================================================================================================== --> <!-- Enables the usage of @GigaSpaceContext annotation based injection. --> <os-core:giga-space-context/> <!-- A bean representing a space (an IJSpace implementation). Note, we perform a lookup on the space since we are working against a remote space. --> <os-core:space id="space" url="jini://*/*/spacePP"/> <!-- OpenSpaces simplified space API built on top of IJSpace/JavaSpace. --> <os-core:giga-space id="gigaSpace" space="space"/> <!-- Defines a local Jini transaction manager. --> <os-core:local-tx-manager id="transactionManager" space="space"/> <!-- The Data feeder bean, writing new OrderEvents objects to the space in a constant interval. --> <bean id="orderEventFeeder" class="com.gigaspaces.examples.tutorials.parallelprocessing.client.OrderEventFeeder"/> <!-- This bean outputs the orderEvent object --> <bean id="outputOrderEvent" class="com.gigaspaces.examples.tutorials.parallelprocessing.client.OrderEventCounterDisplayer"/> <!-- ================================================================================================================== --> <!-- The notification container, registers for notification on every orderEvent write (notify on write is default) that satisfies the template (in this case with status="Approved"). Upon notification invokes the outputOrderEvent listner on a copy of the object that triggered the event. The notify event container is also configured to automatically perform a take on the notification data event, and configured to filter out events if the take operation returned null (this usually happens when several clients receive this event, and only one succeeds with the take) --> <os-events:notify-container id="orderEventNotifyContainer" giga-space="gigaSpace" perform-take-on-notify="true" ignore-event-on-null-take="true"> <os-events:tx-support tx-manager="transactionManager"/> <os-core:template> <bean class="com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent"> <property name="status" value="Rejected"/> </bean> </os-core:template> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="outputOrderEvent"/> </os-events:annotation-adapter> </os-events:listener> </os-events:notify-container> <!-- A polling event container that performs (by default) polling take operations against the space using the provided template (in this case, the processed orderEvent objects). Once a match is found, the outputOrderEvent bean event listener is triggered using the annotation adapter, the listener method is annotated inside the bean with the @SpaceDataEvent annotation. --> <os-events:polling-container id="orderEventPollingEventContainer" giga-space="gigaSpace"> <os-events:tx-support tx-manager="transactionManager"/> <os-core:template> <bean class="com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent"> <property name="status" value="Processed"/> </bean> </os-core:template> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="outputOrderEvent"/> </os-events:annotation-adapter> </os-events:listener> </os-events:polling-container> </beans> Choose another tab (back to top) Validator - Validator Service Validator - Validator Service Bean![]() The OrderEventValidator takes OrderEvents with status New from the space, and sets their status randomly to Approved or Rejected, simulating a validation process. When you follow the code, you can see that there is no indication of the type of OrderEvents handled by the Validator. All this logic is defined in the Validator's pu.xml file, as well as the type of operation the Validator performs on the space (by default, a take operation of a single object that matches the defined template), and at which interval. The service implementation does not need to implement any specific interface or to have any special code. The only thing necessary is the use of the @SpaceDataEvent annotation, which marks the method that handles the event (in this case, the event is a take of an OrderEvent from the space). Note that the same event-handling method (validatesOrderEvent in our case) has a return value. This returned object is written back to the space by the container. Validator service bean source code: package com.gigaspaces.examples.tutorials.parallelprocessing.validator; import org.openspaces.events.adapter.SpaceDataEvent; import com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent; /** * Simple bean used to validate the orderEvent objects. */ public class OrderEventValidator { private long workDuration = 100; /** * Sets the simulated work duration (in milliseconds). Default to 100. */ public void setWorkDuration(long workDuration) { this.workDuration = workDuration; } /** * Validate the given OrderEvent object and return the validated OrderEvent with * status field set to Approved/Rejected according to the validation. * * Can be invoked using OpenSpaces Events when a matching event * occurs or using OpenSpaces remoting. */ @SpaceDataEvent // This annotation marks the method as the event listener. public OrderEvent validatesOrderEvent(OrderEvent orderEvent) { // sleep to simulate some work try { Thread.sleep(workDuration); } catch (InterruptedException e) { // do nothing } System.out.println("VALIDATOR validating orderEvent: "+orderEvent); // For the sake of the example the status is arbitrarily randomized. if ( Math.random() >= 0.5) { orderEvent.setStatus(OrderEvent.STATUS_APPROVED); } else { orderEvent.setStatus(OrderEvent.STATUS_REJECTED); } System.out.println("VALIDATOR set orderEvent status to: "+orderEvent.getStatus()); // orderID is declared as primary key and as auto-generated. // It must be null before writing an operation. orderEvent.setOrderID(null); return orderEvent; } } Choose another tab (back to top) Validator - Config Validator - XML Configuration file![]() Just like in the Client Processing Unit, the space configuration and the types of events the Validator services should handle are defined in the Validator Processing Unit configuration file (pu.xml): <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:os-core="http://www.openspaces.org/schema/core" xmlns:os-events="http://www.openspaces.org/schema/events" xmlns:os-remoting="http://www.openspaces.org/schema/remoting" xmlns:os-sla="http://www.openspaces.org/schema/sla" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd http://www.openspaces.org/schema/sla http://www.openspaces.org/schema/sla/openspaces-sla.xsd"> <!-- ============================================================================================= --> <!-- Enables the usage of @GigaSpaceContext annotation based injection. --> <os-core:giga-space-context/> <!-- A bean representing a space (an IJSpace implementation). Note, we do not specify here the cluster topology of the space. It is declared outside of the processing unit or within the SLA bean. --> <os-core:space id="space" url="jini://*/*/spacePP"/> <!-- OpenSpaces simplified space API built on top of IJSpace/JavaSpace. --> <os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/> <!-- Defines a local Jini transaction manager. --> <os-core:local-tx-manager id="transactionManager" space="space"/> <!-- The orderEvent validator bean --> <bean id="orderEventValidator" class="com.gigaspaces.examples.tutorials.parallelprocessing.validator.OrderEventValidator"/> <!-- ========================================================================================================================= --> <!-- A polling event container that performs (by default) polling take operations against the space using the provided template (in our case, the new orderEvents objects). Once a match is found, the orderEvent validator bean event listener is triggered using the annotation adapter, the listener method is annotated inside the bean with the @SpaceDataEvent annotation. --> <os-events:polling-container id="orderEventValidatorPollingEventContainer" giga-space="gigaSpace"> <os-events:tx-support tx-manager="transactionManager"/> <os-core:template> <bean class="com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent"> <property name="status" value="New"/> </bean> </os-core:template> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="orderEventValidator"/> </os-events:annotation-adapter> </os-events:listener> </os-events:polling-container> </beans> Choose another tab (back to top) Processor - Processor Service Processor - Processor Service![]() Very similiar to how we implemented the OrderEventValidator, the OrderEventProcessor has a method that is annotated with @SpaceDataEvent to mark it as the handler of events. Again, the events are taken as OrderEvents from the space, this time only orders with status Approved that were validated by the Validator. /* * Copyright 2008 GigaSpaces Technologies LTD. All rights reserved. * * THIS SOFTWARE IS PROVIDED "AS IS," WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED INCLUDING BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. GIGASPACES WILL NOT * BE LIABLE FOR ANY DAMAGE OR LOSS IN CONNECTION WITH THE SOFTWARE. */ package com.gigaspaces.examples.tutorials.parallelprocessing.processor; import org.openspaces.events.adapter.SpaceDataEvent; import com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent; /** * The processor uses OpenSpaces support for annotation mark-up allowing to use @SpaceDataEvent to * mark a method as an event listener. Note, processOrderEvent does not use any space * API on the OrderEvent object (though it can), receiving the OrderEvent object to be processed * and returning the result that will be automatically written back to the space. * * <p>You can set the simulated work done when processOrderEvent is called by setting the work * duration (defaults to 100 ms). * * <p>Note, changing the event container is just a matter of xml configuration (for example, * switching from polling container to notify container) and does not affect this class. */ public class OrderEventProcessor { private long workDuration = 100; /** * Sets the simulated work duration (in milliseconds). Default to 100. **/ public void setWorkDuration(long workDuration) { this.workDuration = workDuration; } /** * Process the given OrderEvent object and returns the processed OrderEvent. * <p> * Annotated @SpaceDataEvent - Marks this method to be executed when an event occurs. **/ @SpaceDataEvent public OrderEvent processOrderEvent(OrderEvent orderEvent) { // sleep to simulate some work try { Thread.sleep(workDuration); } catch (InterruptedException e) { // do nothing } System.out.println("PROCESSOR Processing orderEvent: "+orderEvent); // In this simple example the only processing work will be changing the orederEvent status. orderEvent.setStatus(OrderEvent.STATUS_PROCESSED); // orderID is declared as primary key and as auto-generated. // It must be null before writing an operation. orderEvent.setOrderID(null); return orderEvent; } } Choose another tab (back to top) Processor - Config Processor - XML Configuration File![]() Again, like in the Client and Validator Processing Units, the space configuration and the types of events the processor services should handle are defined in the configuration file (pu.xml): <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:os-core="http://www.openspaces.org/schema/core" xmlns:os-events="http://www.openspaces.org/schema/events" xmlns:os-remoting="http://www.openspaces.org/schema/remoting" xmlns:os-sla="http://www.openspaces.org/schema/sla" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd http://www.openspaces.org/schema/sla http://www.openspaces.org/schema/sla/openspaces-sla.xsd"> <!-- ============================================================================================================ --> <!-- Enables the usage of @GigaSpaceContext annotation based injection. --> <os-core:giga-space-context/> <!-- A bean representing a space (an IJSpace implementation). Note, we do not specify here the cluster topology of the space. It is declared outside of the processing unit or within the SLA bean. --> <os-core:space id="space" url="jini://*/*/spacePP"/> <!-- OpenSpaces simplified space API built on top of IJSpace/JavaSpace. --> <os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/> <!-- Defines a local Jini transaction manager. --> <os-core:local-tx-manager id="transactionManager" space="space"/> <!-- The orderEvent processor bean --> <bean id="orderEventProcessor" class="com.gigaspaces.examples.tutorials.parallelprocessing.processor.OrderEventProcessor"/> <!-- ============================================================================================================ --> <!-- A polling event container that performs (by default) polling take operations with txn support against the space using the provided template (in our case, the approved orderEvent objects). Once a match is found, the data processor bean event listener is triggered using the annotation adapter (the method anotated @SpaceDataEvent inside the class is invoked). --> <os-events:polling-container id="orderEventProcessorPollingEventContainer" giga-space="gigaSpace"> <os-events:tx-support tx-manager="transactionManager"/> <os-core:template> <bean class="com.gigaspaces.examples.tutorials.parallelprocessing.common.OrderEvent"> <property name="status" value="Approved"/> </bean> </os-core:template> <os-events:listener> <os-events:annotation-adapter> <os-events:delegate ref="orderEventProcessor"/> </os-events:annotation-adapter> </os-events:listener> </os-events:polling-container> </beans> Choose another tab (back to top) |
![]() |
GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence |