Spark-Riak Connector Add-on (Riak TS)
Configuration of Spark Context
The following import
statements should be included at the top of your Spark application to enable the connector:
import com.basho.riak.client.core.query.Namespace
import com.basho.riak.spark.rdd.RiakFunctions
import org.apache.spark.{SparkContext, SparkConf}
import com.basho.riak.spark._
import pyspark
import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.spark.japi.SparkJavaUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Configuration Options
You can control how your Spark application interacts with Riak by configuring different options for your SparkContext
or SQLContext
. You can set these options within the $SPARK_HOME/conf/spark-default.conf
. If you don’t set an option, it will be automatically set to the default values listed below.
You can set the below options for the SparkConf
object:
Property name | Description | Default value | Riak Type |
---|---|---|---|
spark.riak.connection.host | IP:port of a Riak node protobuf interface | 127.0.0.1:8087 | KV/TS |
spark.riak.connections.min | Minimum number of parallel connections to Riak | 20 | KV/TS |
spark.riak.connections.max | Maximum number of parallel connections to Riak | 30 | KV/TS |
spark.riak.input.fetch-size | Number of keys to fetch in a single round-trip to Riak | 1000 | KV |
spark.riak.input.split.count | Desired minimum number of Spark partitions to divide the data into | 10 | KV |
spark.riak.write.replicas | Quorum value on write. Integer value or symbolic constant can be used. Possible symbolic constants are:
|
default | KV |
spark.riak.connections.inactivity.timeout | Time to keep connection to Riak alive in milliseconds | 1000 | KV/TS |
spark.riakts.bindings.timestamp | To treat/convert Riak TS timestamp columns either as a Long (UNIX milliseconds) or as a Timestamps during the automatic schema discovery. Valid values are:
|
useTimestamp | TS |
spark.riak.partitioning.ts-range-field-name | Name of quantized field for range query | 1 | TS |
spark.riakts.write.bulk-size | Bulk size for parallel TS table writes | 100 | TS |
Example:
val conf = new SparkConf()
.setAppName("My Spark Riak App")
.set("spark.riak.connection.host", "127.0.0.1:8087")
.set("spark.riak.connections.min", "20")
.set("spark.riak.connections.max", "50")
val sc = new SparkContext("spark://127.0.0.1:7077", "test", conf)
conf = pyspark.SparkConf().setAppName("My Spark Riak App")
conf.set("spark.riak.connection.host", "127.0.0.1:8087")
conf.set("spark.riak.connections.min", "20")
conf.set("spark.riak.connections.max", "50")
sc = pyspark.SparkContext("spark://127.0.0.1:7077", "test", conf)
SparkConf sparkConf = new SparkConf().setAppName("My Spark Riak App");
setSparkOpt(sparkConf, "spark.riak.connection.host", "127.0.0.1:8087");
setSparkOpt(sparkConf, "spark.riak.connections.min", "20");
setSparkOpt(sparkConf, "spark.riak.connections.max", "50");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);