Running a Spark Session in a Processing Unit
If you want to run a Spark session directly inside a Processing Unit, GigaSpaces supports using SparkSession
. This is the entry point to interacting with Spark, and to enable programming with the Dataset and DataFrame APIs.
You can initiate and run a SparkSession
inside a Processing Unit with the GigaSpaces SparkSessionProvider
.
Creating a SparkSession
The SparkSessionProvider.Builder()
method enables users to configure and create the SparkSession
that they want to run in the Processing Unit. The functionality is very similar to the generic SparkSession.builder()
method. See the sample code provided here:
SparkSessionProvider sparkSessionProvider = new SparkSessionProvider.Builder()
.master("spark://localhost:7077")
.create();
SparkSessionProvider.Wrapper sparkSessionWrapper = sparkSessionProvider.getOrCreate();
The SparkSessionProvider.getOrCreate()
method returns a sparkSessionWrapper
. If a wrapper already exists (if the method was previously called), the same wrapper is returned. Calling the method increments a global reference counter.
The SparkSessionWrapper
has a close method that decrements the global reference counter and closes the SparkSession
if the counter is zero.
Example
Let’s assume we have a service that exposes a countLines
method, as follows:
package com.mycompany.app;
public interface MyService {
long countLines(String path);
}
An implementation that uses a SparkSession
to perform the counting via the DataFrame API would look like this:
package com.mycompany.app;
import org.apache.spark.sql.SparkSession;
import org.insightedge.spark.SparkSessionProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PostConstruct;
import java.io.Closeable;
import java.io.IOException;
public class MyServiceImpl implements MyService, Closeable {
private static final Logger logger = LoggerFactory.getLogger(MyServiceImpl.class);
private SparkSessionProvider.Wrapper sparkSessionWrapper;
@PostConstruct
public void initialize() {
// Use SparkSessionProvider.Builder() instead of SparkSession.builder() to create a Spark Session
SparkSessionProvider sparkSessionProvider = new SparkSessionProvider.Builder()
.master("spark://localhost:7077")
.create();
sparkSessionWrapper = sparkSessionProvider.getOrCreate();
}
@Override
public long countLines(String path) {
logger.info("Started to count rows of [" + path + "]");
SparkSession sparkSession = sparkSessionWrapper.get();
long totalRows = sparkSession.read().text(path).count();
logger.info("Total rows in file: " + totalRows);
return totalRows;
}
@Override
public void close() throws IOException {
if (sparkSessionWrapper != null)
sparkSessionWrapper.close();
}
}
And then you can initialize the bean with a Configuration class, as follows:
package com.mycompany.app;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyConfiguration {
@Bean
public MyServiceImpl myService() {
return new MyServiceImpl();
}
}
Spark-related JARs are added to the classloader during runtime when the SparkSessionProvider
is used. As a result, if you use Spark classes before this class is initialized, you will get a ClassNotFound error. For example, if you have a method that returns SparkSession
in a bean that is configured in the pu.xml file.