XAP 9.0 Documentation > Back to Table of Contents
Your First Real Time Big Data Analytics Application
Summary: How to use XAP for Real-time analysis of Big Data
IntroductionWe live almost every aspect of our lives in a real-time world. Think about our social communications; we update our friends online via social networks and micro-blogging, we text from our cellphones, or message from our laptops. But it's not just our social lives; we shop online whenever we want, we search the web for immediate answers to our questions, we trade stocks online, we pay our bills, and do our banking. All online and all in real time. Real time doesn't just affect our personal lives. Enterprises and government agencies need real-time insights to be successful, whether they are investment firms that need fast access to market views and risk analysis, or retailers that need to adjust their online campaigns and recommendations to their customers. Even homeland security has come to increasingly rely on real-time monitoring. This tutorial explains the challenges of a Real-time (RT) Analytics system using Twitter as an example, and show in details how these challenges can be met using GigaSpaces XAP. The ChallengeTwitter users aren't just interested in reading tweets of the people they follow; they are also interested in finding new people and topics to follow based on popularity. This poses several challenges to the Twitter architecture due to the vast volume of tweets. In this example, we focus on the challenges relating to calculating the word count use case. The challenge here is straightforward:
These challenges are simple to deal with as there are knock-on effects from the volume and analysis of the data, as follows:
Solution ArchitectureIn designing a solution, we need to consider the various challenges we must address. The first challenge is providing unlimited scalability - therefore, we are talking about dynamically increasing resources to meet demand, and hence implementing a distributed solution using parallelized processing approach. The second challenge is providing low latency - we can't afford to use a distributed file system such as Hadoop HDFS, a relational datbase or a distributed disk-based structured data store such as NoSQL database. All of these use physical I/O that becomes a bottleneck when dealing with massive writes. Furthermore, we want the business logic collocated with the data on a single platform for faster processing, with minimal network hops and integration issues. To overcome the latency challenge, we use an in-memory system of record. GigaSpaces XAP is built just for that. Its core component is in-memory data grid (IMDG, a.k.a. the Space) that partitions the data based on a specified attribute within the data object. The data grid uses a share nothing policy, and each primary node has consistent backup. In addition the grid keeps its SLA by self-healing crashed nodes, so it's completely consistent and highly-available. The third challenge is the efficient processing of the data in a distributed system. To achieve this, we use the Map / Reduce algorithm for distributed computing on large data sets on clusters of computers. In the Map step, we normalize the data so we can create local counters. In the Reduce step, we aggregate the entire set of interim results into a single set of results. In our Twitter example, we need to build a flow that provides the Map / Reduce flow in real time. For this we use XAP's Processing and Messaging features collocated with its corresponding data. Our solution therefore uses 2 modules for persisting and processing data, as follows:
The processor's Map phase has the following logical steps:
The processor's Reduce phase aggregates the local results into global word counters. The following diagram shows the Map / Reduce flow. Implementing the Solution as a XAP ApplicationTo implement our solution, we use Cassandra as the historical data tier and build a XAP application the process and persist the data in real-time using the following modules:
Building the ApplicationThe following are step-by-step instructions building the application. 1. Download and install XAP. XAP 9.0 comes with a built-in license that's good for 3 months after the download. 2. Getting the application cd <project root directory>
git clone <your new repository URL>
We welcome your contributions and suggestions for improvements, and invite you to submit them by performing a pull request. We will review your recommendations and have relevant changes merged. Alternatively, you can download the source files in zip format from the repository home on github. 3. Installing Maven and the GigaSpaces Maven plug-in
4. Building the Application mvn package
If you are getting No gslicense.xml license file was found in current directory error, please run the following: mvn package -DargLine="-Dcom.gs.home="<XapInstallationRoot>" Where XapInstallationRoot should be XAP root folder - example: mvn package -DargLine="-Dcom.gs.home="c:\gigaspaces-xap-premium-9.0.0-ga" The Maven build will download the required dependencies, compile the source files, run the unit tests, and build the required jar files. In our example, the following processing unit jar files are built:
Once the build is complete, a summary message similar to the following is displayed: [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] rt-analytics ...................................... SUCCESS [0.001s] [INFO] rt-common ......................................... SUCCESS [2.196s] [INFO] rt-processor ...................................... SUCCESS [11.301s] [INFO] rt-feeder ......................................... SUCCESS [3.102s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 16.768s [INFO] Finished at: Sun May 13 13:38:06 IDT 2012 [INFO] Final Memory: 14M/81M [INFO] ------------------------------------------------------------------------ Running and Debugging the Application within an IDESince the application is a Maven project, you can load it using your Java IDE and thus automatically configure all module and classpath configurations. With IntelliJ, simply click "File -> Open Project" and point to <applicationRoot>/pom.xml. IntelliJ will load the project and present the modules for you. If you're using Eclipse, you can go to the application root folder, and then at the command (Windows) or shell (*nix) prompt, type: mvn eclipse:eclipse to generate an Eclipse project. Once the project is loaded in your IDE, you can run the application, as follows:
rt-processor project run configuration: rt-feeder project run configuration:
For more information about the IntegratedProcessingUnitContainer class (runs the processing units within your IDE), see Running and Debugging Within Your IDE.
To run the application, run the processor configuration, and then the feeder configuration. An output similar to the following is displayed: 2012-05-13 13:44:11,877 INFO [org.openspaces.bigdata.processor.TweetParser] - parsing tweet SpaceDocument [typeName=Tweet, ... 2012-05-13 13:44:12,777 INFO [org.openspaces.bigdata.processor.TweetParser] - parsing tweet SpaceDocument [typeName=Tweet, ... 2012-05-13 13:44:13,777 INFO [org.openspaces.bigdata.processor.TweetParser] - parsing tweet SpaceDocument [typeName=Tweet, ... 2012-05-13 13:44:14,777 INFO [org.openspaces.bigdata.processor.TweetParser] - parsing tweet SpaceDocument [typeName=Tweet, ... 2012-05-13 13:44:15,778 INFO [org.openspaces.bigdata.processor.TweetParser] - parsing tweet SpaceDocument [typeName=Tweet, ... 2012-05-13 13:44:15,804 INFO [org.openspaces.bigdata.processor.TokenFilter] - filtering tweet 4 2012-05-13 13:44:15,805 INFO [org.openspaces.bigdata.processor.TokenFilter] - filtering tweet 1 2012-05-13 13:44:15,805 INFO [org.openspaces.bigdata.processor.TokenFilter] - filtering tweet 3 2012-05-13 13:44:15,806 INFO [org.openspaces.bigdata.processor.TokenFilter] - filtering tweet 5 2012-05-13 13:44:15,806 INFO [org.openspaces.bigdata.processor.TokenFilter] - filtering tweet 2 2012-05-13 13:44:15,893 INFO [org.openspaces.bigdata.processor.LocalTokenCounter] - local counting of a bulk of 5 tweets 2012-05-13 13:44:15,894 INFO [org.openspaces.bigdata.processor.LocalTokenCounter] - writing 22 TokenCounters across the cluster 2012-05-13 13:44:15,921 INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token Report by 1 2012-05-13 13:44:15,925 INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token never by 1 2012-05-13 13:44:15,925 INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token find by 3 2012-05-13 13:44:15,925 INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token the by 1 2012-05-13 13:44:15,925 INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token pee by 1 2012-05-13 13:44:15,926 INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token have by 1 2012-05-13 13:44:15,926 INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token close by 1 2012-05-13 13:44:15,926 INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token didn by 1 2012-05-13 13:44:15,927 INFO [org.openspaces.bigdata.processor.GlobalTokenCounter] - incrementing local token door by 1
Running the Project with XAP runtime EnvironmentThe following are step-by-step instructions for running the application in XAP:
Next - deploy the feeder: ./gs.sh deploy <applicationRoot>/feeder/target/rt-feeder-XAP-9.0 You should see the following output: Uploading [rt-feeder-XAP-9.0] to [http://127.0.0.1:58313/] SLA Not Found in PU. Using Default SLA. Waiting indefinitely for [1] processing unit instances to be deployed... [rt-feeder-XAP-9.0] [1] deployed successfully on [127.0.0.1] Finished deploying [1] processing unit instances Once the application is running, you can use the XAP UI tools to view your application , access the data and the counters and manage the application:
Viewing Most Popular words on TwitterTo view the most popular words on Twitter , start the GS-UI using the gs-ui.bat/sh , click the Query icon as demonstrated below and execute the following SQL Query by clicking the select uid,* from com.j_spaces.map.Envelope order by value DESC You should see the top most popular words on twitter ordered by their popularity: You can re-execute the query just by clicking the Persisting to CassandraOnce raw tweets are processed, they are moved from the Space to the historical data backend store. By default, this points to a simple flat file storage. The example application also includes a Cassandra driver CassandraExternalPersistence which implements the ExternalPersistence interface. The following are step-by-step instructions for configuring te application to persist to Cassandra. <!-- fileExternalPersistence is meant for testing purposes, and persists to a file in the local file system --> <!--bean id="fileExternalPersistence" class="org.openspaces.bigdata.processor.FileExternalPersistence"> <constructor-arg index="0" value="tweetRepo.txt"/> </bean--> <!-- cassandraExternalPersister persists to a cassandra DB --> <bean id="cassandraExternalPersister" class="org.openspaces.bigdata.processor.CassandraExternalPersistence"/> 2. Download, install, and start the Cassandra database. For more information, see Cassandra's Getting Started page. <cassandra home>/bin/cassandra-cli --host <cassandra host name> --file <project home>/processor/cassandra-schema.txt 4. Run the application within XAP, as described in the previous section. Running the Example on any Cloud using the Cloudify componentTo run the application with the Cassandra DB as one application on any cloud, we will use the new set of cloud features introduced with XAP 9.0.0. A key concept in this approach is deploying and managing the entire application life cycle using a recipe. This approach provides total application life-cycle automation without any code or architecture change. Furthermore, it is cloud neutral so you don't get locked-in to a specific cloud vendor. The following snippet shows our example application's recipe: application { name="rt_app" service { name = "feeder" dependsOn = ["processor"] } service { name = "processor" dependsOn = ["rt_cassandra"] } service { name = "rt_cassandra" } } The following snippet shows the life-cycle events described in the Cassandra service recipe: service { name "rt_cassandra" icon "Apache-cassandra-icon.png" numInstances 1 type "NOSQL_DB" lifecycle{ init "cassandra_install.groovy" preStart "cassandra_prestart.groovy" start "cassandra_start.groovy" postStart "cassandra_poststart.groovy" } ... } The following snippet shows the processing unit described in the processor recipe: service { icon "icon.png" name "processor" numInstances 2 statefulProcessingUnit { binaries "rt-analytics-processor.jar" sla { memoryCapacity 32 maxMemoryCapacity 32 highlyAvailable true memoryCapacityPerContainer 16 } } } The application recipe is packaged, as follows: Testing the application on a Local CloudXAP 9.0 comes with a cloud emulator called localcloud that allows you to test the recipe execution on your local machine.
For more information, see Deploying Applications page. Running on cloudsTo run the application on one of the supported clouds, proceed the following steps:
The Design In DetailsNow let's take a closer look at the components of the solution. Our solution is designed to efficiently cope with getting and processing the large volume of tweets. First, we partition the tweets so that we can process them in parallel, but we have to decide on how to partition them efficiently. Partitioning by user might not be sufficiently balanced, therefore we decided to partition by the tweet ID, which we assume to be globally unique. Then we need persist and process the data with low latency, and for this we store the tweets in memory. This section describes the following components of the solution that implements these design decisions: Getting the TweetsFirst, we need to get the tweets and store them in the Space (IMDG). In this example, we use Spring Social to provide a Java interface to the Twitter API and get the tweets, and the SpaceDocument API of XAP to store the tweets. Using a SpaceDocument allows for a more flexible data model, the SpaceDocument being like a Map. The partitioning used the default 'ID' attribute. The following snippet shows the relevant TwitterHomeTimelineFeederTask sections. public class TwitterHomeTimelineFeederTask implement Runnable { ... public SpaceDocument buildTweet(Tweet tweet) { return new SpaceDocument("Tweet", new DocumentProperties() .setProperty("Id", tweet.getId()) .setProperty("Text", tweet.getText()) .setProperty("CreatedAt", tweet.getCreatedAt()) .setProperty("FromUserId", tweet.getFromUserId()) .setProperty("ToUserId", tweet.getToUserId()) .setProperty("Processed", Boolean.FALSE)); } /** * Return all the tweets from the Twitter API */ private List<Tweet> getPublicTimeline() { return new TwitterTemplate() // .timelineOperations() // .getPublicTimeline(); } ... } Parsing the TweetsNow the real fun begins. We have the raw data but we need to tokenize and filter it, and then update the local counters - these are the taks performed by the Map phase of the Map / Reduce algorithm. To generate this real-time flow, XAP uses the event driven architecture of the event container. Specifically, we use a Polling Container to listen for events relating to the writing of raw tweets to the Space. These events are configured using the SQLQuery returned by the unprocessedTweet method marked as @EventTemplate. Then, we tokenize & filter the tweet using the @SpaceDataEvent to mark the event handling method. The result is an object of type TokenizedTweet written to the Space. The following snippet shows the relevant TweetParser sections. @EventDriven @Polling(gigaSpace = "gigaSpace", concurrentConsumers = 2, maxConcurrentConsumers = 2, receiveTimeout = 60) @TransactionalEvent(timeout = 100) public class TweetParser { ... /** * Event handler that receives a Tweet instance, processes its text and generates a listing of the tokens appearing in the text * and their respective count of appearance in the text, instantiates an instance of {@link TokenizedTweet} with this data, * and writes it to the space. * * @param tweet * @return {@link TokenizedTweet} containing a mapping of {token->count} */ @SpaceDataEvent public SpaceDocument eventListener(SpaceDocument tweet) { log.info("parsing tweet " + tweet); Long id = (Long) tweet.getProperty("Id"); String text = tweet.getProperty("Text"); if (text != null) { gigaSpace.write(new TokenizedTweet(id, tokenize(text))); } tweet.setProperty("Processed", true); return tweet; } protected Map<String, Integer> tokenize(String text) { Map<String, Integer> tokenMap = newHashMap(); StringTokenizer st = new StringTokenizer(text, "\"{}[]:;|<>?`'.,/~!@#$%^&*()_-+= \t\n\r\f\\"); while (st.hasMoreTokens()) { String token = st.nextToken(); if (token.length() < MIN_TOKEN_LENGTH) { continue; } Integer count = tokenMap.containsKey(token) ? tokenMap.get(token) + 1 : 1; tokenMap.put(token, count); } return tokenMap; } } Filtering the TweetsThe TokenFilter event handler is triggered by the writing of maps of tokens (TokenizedTweet objects) to the Space (marked as Non-Filtered). It's implemented as a batch polling container with a batch size of 100 entries. This class is responsible for filtering out a default list of irrelevant words like prepositions, and can be extended by applying additional values lists stored in the Space as "black lists". The filter updates the TokenizedTweet objects, removing the irrelevant words and writes them to the Space. The following snippet shows the relevant TokenFilter sections. @EventDriven @Polling(gigaSpace = "gigaSpace", concurrentConsumers = 2, maxConcurrentConsumers = 2, receiveTimeout = 5000) @TransactionalEvent public class TokenFilter { ... /** * Event handler that receives a {@link TokenizedTweet} and filters out non-informative tokens. Filtering is performed using * {@link #isTokenRequireFilter(String)} * * @param tokenizedTweet * @return the input tokenizedTweet after modifications */ @SpaceDataEvent public TokenizedTweet eventListener(TokenizedTweet tokenizedTweet) { log.info("filtering tweet " + tokenizedTweet.getId()); Map<String, Integer> tokenMap = newHashMap(tokenizedTweet.getTokenMap()); int numTokensBefore = tokenMap.size(); Iterator<Entry<String, Integer>> it = tokenMap.entrySet().iterator(); while (it.hasNext()) { Entry<String, Integer> entry = it.next(); if (isTokenRequireFilter(entry.getKey())) { it.remove(); } } int numTokensAfter = tokenMap.size(); tokenizedTweet.setTokenMap(tokenMap); tokenizedTweet.setFiltered(true); log.fine("filtered out " + (numTokensBefore - numTokensAfter) + " tokens from tweet " + tokenizedTweet.getId()); return tokenizedTweet; } private boolean isTokenRequireFilter(final String token) { return filterTokensSet.contains(token); } private static final Set<String> filterTokensSet = newHashSet("aboard", "about", "above", "across", "after", "against", "along", "amid", "among", "anti", "around", "as", "at", "before", "behind", "below", "beneath", "beside", "besides", "between", "beyond", "but", "by", "concerning", "considering", "despite", "down", "during", "except", "excepting", "excluding", "following", "for", "from", "in", "inside", "into", "like", "minus", "near", "of", "off", "on", "onto", "opposite", "outside", "over", "past", "per", "plus", "regarding", "round", "save", "since", "than", "through", "to", "toward", "under", "underneath", "unlike", "until", "up", "upon", "versus", "via", "with", "within", "without"); } Generating the Local CountersThis step completes the Map phase by taking the filtered maps of tokens, normalizing them, and counting the occurrences of relevant words per tweet. This is achieved using a PollingContainer named LocalTokenCounter that reads batches of filtered TokenizedTweet objects, and updates the counters which are TokenCounter objects in the Space. Note that TokenCounter objects are partitioned by the token for which they are aggregating the count. The following snippet shows the relevant LocalTokenCounter sections. @EventDriven @Polling(gigaSpace = "gigaSpace", passArrayAsIs = true, concurrentConsumers = 1, maxConcurrentConsumers = 1, receiveTimeout = 1000) @TransactionalEvent public class LocalTokenCounter { ... /** * Event handler that takes a bulk of {@link TokenizedTweet}, counts appearances of tokens in the bulk, and generates a * corresponding {@link TokenCounter} for each token. * * @param tokenizedTweets * array of {@link TokenizedTweet} matching the event template */ @SpaceDataEvent public void eventListener(TokenizedTweet[] tokenizedTweets) { log.info("local counting of a bulk of " + tokenizedTweets.length + " tweets"); Map<String, Integer> tokenMap = newHashMap(); for (TokenizedTweet tokenizedTweet : tokenizedTweets) { log.fine("--processing " + tokenizedTweet); for (Entry<String, Integer> entry : tokenizedTweet.getTokenMap().entrySet()) { String token = entry.getKey(); Integer count = entry.getValue(); int newCount = tokenMap.containsKey(token) ? tokenMap.get(token) + count : count; log.finest("put token " + token + " with count " + newCount); tokenMap.put(token, newCount); } } log.info("writing " + tokenMap.size() + " TokenCounters across the cluster"); for (Entry<String, Integer> entry : tokenMap.entrySet()) { String token = entry.getKey(); Integer count = entry.getValue(); log.fine("writing new TokenCounter: token=" + token + ", count=" + count); clusteredGigaSpace.write(new TokenCounter(token, count), LEASE_TTL, WRITE_TIMEOUT, UPDATE_OR_WRITE); } } } Generating Global CountersNow, the Reduce phase aggregates the local counters into global integer counters. This is achieved using another polling PollingContainer named GlobalTokenCounter listening for filtered TokenCounter objects. The container reads a batch of TokenCounter objects and updates the global count for each word. The global counter is an entry in the Space where the key is the token and the value is the aggregated count. The following snippet shows the relevant GlobalTokenCounter sections. @EventDriven @Polling(gigaSpace = "gigaSpace", concurrentConsumers = 2, maxConcurrentConsumers = 2, receiveTimeout = 1000) @TransactionalEvent public class GlobalTokenCounter { ... @SpaceDataEvent public void eventListener(TokenCounter counter) { incrementLocalToken(counter.getToken(), counter.getCount()); } @SuppressWarnings("unchecked") @Transactional(readOnly = false, propagation = REQUIRED, isolation = READ_COMMITTED) private void incrementLocalToken(String token, Integer count) { log.info("incrementing local token " + token + " by " + count); Integer globalCount = gigaMap.containsKey(token) ? (Integer) gigaMap.get(token) + count : count; gigaMap.put(token, globalCount); log.fine("+++ token=" + token + " count=" + globalCount); } } Persisting the Tweets to the Big Data StoreIn this example, we use Apache Cassandra as a historical Big Data store enabling future slicing and dicing of the raw tweets data. Similarly, we could use any database to persist the data, for example, another NoSQL data store or even to Hadoop HDFS. TweetPersister is a batch PollingContainer that uses the ExternalPersistence interface. TweetPersister writes batches of 100 parsed tweets to the NoSQL data store. In this case we use the CassandraExternalPersistence implementation that uses the Hector Cassandra client for java. public class CassandraExternalPersistence implements ExternalPersistence { ... @PostConstruct public void init() throws Exception { log.info(format("initializing connection to Cassandra DB: host=%s port=%d keyspace=%s column-family=%s\n" // , host, port, keyspaceName, columnFamily)); cluster = getOrCreateCluster(keyspaceName, host + ":" + port); keyspace = createKeyspace(keyspaceName, cluster); } @Override public void write(Object data) { if (!(data instanceof SpaceDocument)) { log.log(Level.WARNING, "Received non document event"); return; } SpaceDocument document = (SpaceDocument) data; Long id = document.getProperty("Id"); log.info("persisting data with id=" + id); Mutator<String> mutator = createMutator(keyspace, stringSerializer); for (String key : document.getProperties().keySet()) { Object value = document.getProperty(key); if (value != null) { mutator.addInsertion(String.valueOf(id), // columnFamily, // createColumn(key, value.toString(), stringSerializer, stringSerializer)); } } mutator.execute(); } ... } |
![]() |
GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence |