Instaclustr Spark with SSL configured Apache Cassandra Cluster
For Legacy Support Purposes Only |
---|
A common setup for a Cassandra cluster is to enable client encryption. In order to utilize Spark with these clusters, additional steps must be taken when submitting jobs to configure the Spark Cassandra connector to use SSL. In this guide, we will go through these steps and attempt to clarify the configuration properties used.
As a prerequisite to this guide, the user should have provisioned and configured a cluster with both Cassandra and Spark. You can find the details on how to do this in sections 1, 2 and 3 of the following article. Getting Started with Instaclustr Spark & Cassandra. Remember to select Client ⇄ Node Encryption to enable client encryption when creating the cluster. This option is not available for Developer node size, so you must select a Production node size.
Table of Contents
Download Truststore File
You will need to download the Certificates for the cluster from the Connection info page for your cluster.
In the downloaded zip, you will find a Java Key Store file called truststore.jks. This file needs to be included as a resource in the assembled jar in a later step.
Creating and Submitting a Scala Job with SSL Cassandra Connection
In this step of the tutorial, we will demonstrate how to build and submit a Scala job. This is useful where you wish to create a job and submit it multiple times.
- Log in to your Spark client machine
- Create required directories for your project:
123456mkdir ~/cassandra-countcd cassandra-countmkdir -p src/main/scalamkdir projectmkdir -p src/main/javamkdir -p src/main/resources
- Create a file called build.sbt in the cassandra-count directory with the following contents (note: the blank lines are important):
123456789101112131415161718name := "cassandra-count"version := "1.0"scalaVersion := "2.11.8"libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.1" % "provided"libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.2"libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.1" % "provided"assemblyMergeStrategy in assembly := {case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.lastcase x =>val old = (assemblyMergeStrategy in assembly).valueold(x)}
- Create a file called assembly.sbt in the cassandra-count/project directory with the following contents (this will include required dependencies in the output jars):
1addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
- Create a file called cassandra-count.scala in the cassandra-count/src/main/scala directory with the following contents:
1234567891011121314151617181920212223import org.apache.spark.SparkContextimport org.apache.spark.SparkConfimport com.datastax.spark.connector._object cassandraCount {def main(args: Array[String]) {// 1. Create a conf for the Spark context// In this example, Spark master and Cassandra nodes info are provided in a separate count.conf file.val conf = new SparkConf().setAppName("Counting rows of a cassandra table")// 2. Create a Spark context.val sc = new SparkContext(conf)// 3. Create an rdd that connects to the Cassandra table "schema_keyspaces" of the keyspace "system_schema".val rdd = sc.cassandraTable("system_schema", "keyspaces")// 4. Count the number of rows.val num_row = rdd.count()println("nn Number of rows in system_schema.keyspaces: " + num_row + "nn")// 5. Stop the Sspark context.sc.stop}}
- In order for Spark to connect to Cassandra using SSL, an appropriate SSL Context needs to be created on the Spark driver and all the executors. This is achieved via providing SSL specific properties to the Spark Cassandra connector. Using the default factory the path to the truststore file needs to be valid for the driver and executors. This can be restrictive. An alternative is to create a custom connector. Next, we are going to create a custom Cassandra connection class which treats the trust store path property as a resource path rather than a file path. This allows the reading of the trust store from a resource inside the assembled jar. Create a file called CustomCassandraConnectionFactory.java in the cassandra-count/src/main/java directory with the following contents:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109import com.datastax.driver.core.Cluster;import com.datastax.driver.core.JdkSSLOptions;import com.datastax.driver.core.SSLOptions;import com.datastax.driver.core.SocketOptions;import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;import com.datastax.spark.connector.cql.CassandraConnectionFactory;import com.datastax.spark.connector.cql.CassandraConnectorConf;import com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy;import com.datastax.spark.connector.cql.MultipleRetryPolicy;import scala.collection.immutable.HashSet;import scala.collection.immutable.Set;import scala.reflect.ClassTag;import javax.net.ssl.SSLContext;import javax.net.ssl.TrustManagerFactory;import java.io.IOException;import java.io.InputStream;import java.net.Inet4Address;import java.security.*;import java.security.cert.CertificateException;import java.util.ArrayList;import java.util.List;import com.datastax.spark.connector.cql.*;import com.datastax.spark.connector.cql.Scanner;import scala.collection.JavaConversions.*;import scala.collection.IndexedSeq;import com.datastax.spark.connector.rdd.ReadConf;public class CustomCassandraConnectionFactory implements CassandraConnectionFactory {@Overridepublic Cluster createCluster(CassandraConnectorConf conf) {try {return clusterBuilder (conf).build();} catch (Exception e) {throw new RuntimeException(e);}}@Overridepublic Set<String> properties() {try {return new HashSet<String>();} catch (Exception e) {throw new RuntimeException(e);}}@Overridepublic Scanner getScanner (ReadConf readConf, CassandraConnectorConf connConf, IndexedSeq<String> columnNames){return new DefaultScanner(readConf, connConf, columnNames);}private Cluster.Builder clusterBuilder(CassandraConnectorConf conf) throws CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {SocketOptions socketOptions = new SocketOptions();socketOptions.setConnectTimeoutMillis(conf.connectTimeoutMillis());socketOptions.setReadTimeoutMillis(conf.readTimeoutMillis());List<Inet4Address> hosts = new ArrayList<Inet4Address>();scala.collection.Iterator iter = conf.hosts().toIterator();while (iter.hasNext()) {Inet4Address a = (Inet4Address) iter.next();hosts.add(a);}Cluster.Builder builder = Cluster.builder().addContactPoints(hosts.toArray(new Inet4Address[0])).withPort(conf.port()).withRetryPolicy(new MultipleRetryPolicy(conf.queryRetryCount())).withReconnectionPolicy(new ExponentialReconnectionPolicy(conf.minReconnectionDelayMillis(), conf.maxReconnectionDelayMillis())).withLoadBalancingPolicy(new LocalNodeFirstLoadBalancingPolicy(conf.hosts(), conf.localDC(), true)).withAuthProvider(conf.authConf().authProvider()).withSocketOptions(socketOptions).withCompression(conf.compression());if (conf.cassandraSSLConf().enabled()) {SSLOptions options = createSSLOPtions(conf.cassandraSSLConf());if (null != options) {builder = builder.withSSL(options);} else {builder = builder.withSSL();}}return builder;}SSLOptions createSSLOPtions (CassandraConnectorConf.CassandraSSLConf conf) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException, KeyManagementException {if (conf.trustStorePath().isEmpty()) {return null;}try (InputStream trustStore = this.getClass().getClassLoader().getResourceAsStream(conf.trustStorePath().get())) {KeyStore keyStore = KeyStore.getInstance(conf.trustStoreType());keyStore.load(trustStore, conf.trustStorePassword().isDefined() ? conf.trustStorePassword().get().toCharArray() : null);TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());tmf.init(keyStore);SSLContext context = SSLContext.getInstance(conf.protocol());context.init(null, tmf.getTrustManagers(), new SecureRandom());ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);return JdkSSLOptions.builder().withSSLContext(context).withCipherSuites((String[]) conf.enabledAlgorithms().toArray(tag)).build();}}}
- Copy the trust store file downloaded in the earlier step to the cassandra-count/src/main/resources directory.
- Additional Properties are needed to set up the connection for the SSL connection to Cassandra
Property Name Description spark.cassandra.connection.ssl.enabled A boolean switch to indicate whether the connection to Cassandra should use SSL spark.cassandra.connection.ssl.trustStore.password The password matching the Trust Store spark.cassandra.connection.ssl.trustStore.path/td> The path to the trust store file. With the Custom Factory in this example, this is a path to a resource instead spark.cassandra.connection.factory For overriding the behaviour of the default Spark Cassandra Connector. When used it should be the name of the class that implements CassandraConnectionFactory. Details of this class can be found at the DataStax Spark Cassandra Connector page at GitHub Create a file called cassandra-count.conf in the cassandra-count directory (this file contains the configuration that will be used when we submit the job):
123456789101112spark.master spark://<spark_master_private_IP1>:7077,<spark_master_private_IP2>:7077,<spark_master_private_IP3>:7077spark.executor.memory 1gspark.cassandra.connection.host <private IP of a cassandra node>spark.cassandra.auth.username iccassandraspark.cassandra.auth.password <iccassandra password>spark.serializer org.apache.spark.serializer.KryoSerializerspark.eventLog.enabled truespark.eventLog.dir .spark.cassandra.connection.ssl.enabled truespark.cassandra.connection.ssl.trustStore.password <trust store password>spark.cassandra.connection.ssl.trustStore.path truststore.jksspark.cassandra.connection.factory CustomCassandraConnectionFactory - Build the job (from cassandra-count directory):
1sbt assembly
- Submit the job (from cassandra-count directory):
1~/spark-2.1.1-bin-hadoop2.6/bin/spark-submit --properties-file cassandra-count.conf --class cassandraCount target/scala-2.11/cassandra-count-assembly-1.0.jar
- You should see a lot of log messages with the row count message about 15 messages from the end. And you should see this output:
Using Spark Shell
Connecting to Cassandra via SSL when using Spark Shell is achieved in the same fashion as Spark Submit. The jar containing the custom connection factory and trust store resource must be added to the list of jar files. The same configuration properties used to set up the context for the SSL connection must also be specified. Below is an example Spark Shell Command:
1 2 |
cd ~/spark-2.1.1-bin-hadoop2.6/bin ./spark-shell --master spark://<spark_master_IP1>:7077,<spark_master_IP2>:7077,<spark_master_IP1>:7077 --conf spark.cassandra.connection.host= <private IP of a cassandra node> --conf spark.cassandra.auth.username=iccassandra --conf spark.cassandra.auth.password= <iccassandra password> --jars ~/spark-cassandra-connector-assembly-2.0.2.jar,$HOME/cassandra-count/target/scala-2.11/cassandra-count-assembly-1.0.jar --conf spark.cassandra.connection.ssl.enabled=true --conf spark.cassandra.connection.ssl.trustStore.password=<trust store password> --conf spark.cassandra.connection.ssl.trustStore.path=truststore.jks --conf spark.cassandra.connection.factory=CustomCassandraConnectionFactory |
Further Resources
You can find the source code used in this guide at this GitHub page.