Summary: Allows archiving of space objects to an external persistency service.
OverviewThe archive container is used to transfer historical data into Big-Data storage (for example Cassandra). The typical scenario is when streaming vast number of raw events through the Space, enriching them and then moving them to a Big-Data storage. Typically, there is no intention of keeping them in the space nor querying them in the space.
Archive Container vs Space PersistencyThe Archive Container differs from Space Persistency in the following ways:
Archive Container running side-by-side with Space PersistencyThere are use-cases in which the same Space uses both an Archive Container and Space Persistency. Normally types that are archived by the Archive Container, should not be handled by Space Persistency. Hence, these types should be marked as Transient. ConfigurationHere is a simple example of an archive container configuration:
Annotation
<!-- Enable scan for OpenSpaces and Spring components --> <context:component-scan base-package="com.mycompany"/> <!-- Enable support for @Archive annotation --> <os-archive:annotation-support /> <os-core:space id="space" url="/./space" /> <os-core:distributed-tx-manager id="transactionManager" space="space"/> <os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager" tx-timeout="120"/> <os-archive:cassandra-archive-handler id="cassandraArchiveHandler" giga-space="gigaSpace" hosts="${cassandra.hosts}" keyspace="${cassandra.keyspace}" /> @Archive(batchSize = 100) @TransactionalEvent(timeout = 120) public class ExpiredTweetsArchiveContainer { @DynamicEventTemplate public SQLQuery<SpaceDocument> getUnprocessedExpiredTweets() { final long expired = System.currentTimeMillis() - 60000; final boolean processed = true; final SQLQuery<SpaceDocument> dynamicTemplate = new SQLQuery<SpaceDocument>("Tweet", "Processed = ? AND Timestamp < ?", processed, expired); return dynamicTemplate } } Namespace <os-core:space id="space" url="/./space" /> <os-core:distributed-tx-manager id="transactionManager" space="space"/> <os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/> <os-archive:cassandra-archive-handler id="cassandraArchiveHandler" giga-space="gigaSpace" hosts="${cassandra.hosts}" keyspace="${cassandra.keyspace}" /> <os-archive:archive-container id="archiveContainer" giga-space="gigaSpace" archive-handler="cassandraArchiveHandler" batch-size="${archiver.batch.size}" > <os-archive:tx-support tx-manager="transactionManager" tx-timeout="120"/> <os-core:template ref="archiveFilter" /> </os-archive:archive-container> <bean id="archiveFilter" class="ExpiredTweetsFilter"/> public class ExpiredTweetsFilter implements DynamicEventTemplateProvider{ @Override public Object getDynamicEventTemplate() { final long expired = System.currentTimeMillis() - 60000; final boolean processed = true; final SQLQuery<SpaceDocument> dynamicTemplate = new SQLQuery<SpaceDocument>("Tweet", "Processed = ? AND Timestamp < ?", processed, expired); return dynamicTemplate; } } Plain XML <bean id="space" class="org.openspaces.core.space.UrlSpaceFactoryBean"> <property name="url" value="/./space" /> </bean> <bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean"> <property name="space" ref="space" /> </bean> <bean id="cassandraArchiveHandler" class="org.openspaces.persistency.cassandra.archive.CassandraArchiveOperationHandler"> <property name="gigaSpace" ref="gigaSpace"/> <property name="hosts" value="${cassandra.hosts}" /> <property name="keyspace" value="${cassandra.keyspace}" /> </bean> <bean id="archiver" class="org.openspaces.archive.ArchivePollingContainer"> <property name="gigaSpace" ref="gigaSpace" /> <property name="dynamicTemplate" ref="archiveFilter" /> <property name="archiveHandler" ref="cassandraArchiveHandler" /> </bean> <bean id="archiveFilter" class="ExpiredTweetsFilter"/> public class ExpiredTweetsFilter implements DynamicEventTemplateProvider{ @Override public Object getDynamicEventTemplate() { final long expired = System.currentTimeMillis() - 60000; final boolean processed = true; final SQLQuery<SpaceDocument> dynamicTemplate = new SQLQuery<SpaceDocument>("Tweet", "Processed = ? AND Timestamp < ?", processed, expired); return dynamicTemaplte; } } Code TransactionManager txManager = new DistributedJiniTxManagerConfigurer().transactionManager(); UrlSpaceConfigurer urlSpaceConfigurer = new UrlSpaceConfigurer("/./space"); IJSpace space = urlSpaceConfigurer.create(); GigaSpace gigaSpace = new GigaSpaceConfigurer(space).transactionManager(txManager).create(); ArchiveOperationHandler cassandraArchiveHandler = new CassandraArchiveOperationHandlerConfigurer() .keyspace(cassandraKeyspace) .hosts(cassandraHost) .gigaSpace(gigaSpace) .create(); ArchivePollingContainer archiveContainer = new ArchivePollingContainerConfigurer(gigaSpace) .archiveHandler(cassandraArchiveHandler); .transactionManager(txManager); .batchSize(100); .dynamicTemplate(new DynamicEventTemplateProvider() { @Override public Object getDynamicEventTemplate() { final long expired = System.currentTimeMillis() - 60000; final boolean processed = true; final SQLQuery<SpaceDocument> dynamicTemplate = new SQLQuery<SpaceDocument>("Tweet", "Processed = ? AND Timestamp < ?", processed, expired); return dynamicTemplate; } }) .create(); // To free the resources used by the archive container make sure you close it properly. // A good life cycle event to place the destroy() call would be within the @PreDestroy or DisposableBean#destroy() method. archiveContainer.destroy(); cassandraArchiveHandler.destroy(); urlSpaceConfigurer.destroy(); The example above removes (takeMultiple) objects with a certain timestamp member value from space and writes them onto Cassandra. For a real-world example consult the streaming-bigdata example : Primary/BackupThe archive container, performs take operations only when the relevant space it is working against is in primary mode. When the space is in backup mode, no take operations are performed. If the space moves from backup mode to primary mode, the take operations are started.
Static Template DefinitionWhen removing objects from the space, a template is defined, creating a virtualized subset of data within the space that matches it. GigaSpaces supports templates based on the actual domain model (with null values denoting wildcards), which are shown in the examples. GigaSpaces allows the use of SQLQuery in order to query the space, which can be easily used with the event container as the template. Here is an example of how it can be defined:
Annotation
@Archive public class ProcessedTweetsFilter { @EventTemplate public SQLQuery<SpaceDocument> processedTweets() { final boolean processed = true; final SQLQuery<SpaceDocument> staticTemplate = new SQLQuery<SpaceDocument>("Tweet", "Processed = ?", processed); return staticTemplate; } } Namespace <os-archive:archive-container id="archiveContainer" giga-space="gigaSpace" archive-handler="cassandraArchiveHandler" batch-size="${archiver.batch.size}" > <os-archive:tx-support tx-manager="transactionManager"/> <os-core:sql-query where="processed = true" class-name="Tweet"/> </os-archive:archive-container> Plain XML <bean id="archiveContainer" class="org.openspaces.archive.ArchivePollingContainer"> <property name="gigaSpace" ref="gigaSpace" /> <property name="archiveHandler" ref="cassandraArchiveHandler" /> <property name="batchSize" value="100" /> <property name="template"> <bean class="com.j_spaces.core.client.SQLQuery"> <constructor index="0" value="Tweet" /> <constructor index="0" value="processed = true" /> </bean> </property> </bean> Dynamic Template DefinitionWhen removing objects from the space a dynamic template can be used. A method providing a dynamic template is called before each take operation, and can return a different object in each call. Batch OperationsArchiving in batches may improve the processing throughput performance. Instead of consuming object one at a time from the space and archiving it, you may consume a batch with multiple objects and process these in one transaction. This may improve the overall throughput rate, but may impact the latency of the individual object archiving time. The archive operation handler determines if it can archive more than one object at a time by implementing ArchiveOperationHandler.supportsBatchArchiving(). Transaction SupportBoth the space take operation and the archive action should be configured to be performed under a transaction. When an exception occurs in the archiver under transaction, the take operation is rolled back and the object is returned to the space. In case of an exception during a batch archive operation, the complete object batch is returned to the space. The next time objects are archived, they would be taken from the space and archived again. This means that the archive operation handler archive needs to do one of the following:
When using transactions with archive container a special care should be taken with timeout values. Transactions started by the archive container can have a timeout value associated with them (if not set will default to the default timeout value of the transaction manager, which is 60 Sec). If setting a specific timeout value, make sure the timeout value is higher than receive-timeout and the ArchiveOperationHandler#archive() time together. Adding transaction support is done by injecting a transaction manager into the archive-container and giga-space beans. See the example at the Configuration section.
Default Values of Archive Container Configuration ParametersThe default values for all of the polling container configuration parameters such as concurrent-consumers, batch-size, receive-timeout and others can be found in the JavaDoc (and sources) of the class ArchivePollingContainer and its super class, namely SimplePollingEventListenerContainer and AbstractPollingEventListenerContainer . FIFO GroupingThe FIFO Grouping designed to allow efficient processing of events with partial ordering constraints. Instead of maintaining a FIFO queue per class type, it lets you have a higher level of granularity by having FIFO queue maintained according to a specific value of a specific property. For more details see FIFO Grouping. Concurrent ConsumersBy default, the archive container starts a single thread that performs take operations and invokes the archive handler. It can be configured to start several concurrent consumer threads and have an upper limit to the concurrent consumer threads. This provides faster archiving, however, any FIFO behavior that might be configured in the space and/or template is lost.
Here is an example of an archive container with 3 concurrent consumers and 5 maximum concurrent consumers and fifo grouping enabled:
Annotation
@Archive(batchSize = 100, concurrentConsumers = 3, maxConcurrentConsumers = 5, useFifoGrouping = true) @TransactionalEvent(timeout = 120) public class ExpiredTweetsFilter { @DynamicEventTemplate public SQLQuery<SpaceDocument> expiredTweets() { // ... } } Namespace <os-archive:archive-container id="archiveContainer" giga-space="gigaSpace" batch-size="100" concurrent-consumers="3" max-concurrent-consumers="5" useFifoGrouping="true" > <!-- ... --> </os-archive:archive-container> Plain XML <bean id="archiveContainer" class="org.openspaces.archive.ArchivePollingContainer"> <property name="batchSize" value="100" /> <property name="concurrentConsumers" value="3" /> <property name="maxConcurrentConsumers" value="5" /> <property name="useFifoGrouping" value="true" /> <!-- ... --> </bean> Schema
|
![]() |
GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence |