Spark-Riak Connector Add-on (Riak TS)
Bulk Write
To write into a Riak TS table, the Spark-Riak Connector splits the initial set of rows into smaller bulks and processes them in parallel. Bulk size can be configured using spark.riakts.write.bulk-size
property. The default number is 100
.
As an example, let’s say your RDD has 2000 rows and you set spark.riakts.write.bulk-size
to 200 and spark.riak.connections.min
to 5. Then, there will be 10 bulks with 200 rows and each bulk will have 5 parallel write connections to Riak. The bulk size option can be configured in SparkConf:
val conf = new SparkConf().set("spark.riakts.write.bulk-size", "500")
conf = new SparkConf().set("spark.riakts.write.bulk-size", "500")
Or you can set the spark.riakts.write.bulk-size
property in the DataFrame’s .option()
:
val df = sqlContext.write
.option("spark.riakts.write.bulk-size", "500")
.format("org.apache.spark.sql.riak")
.mode(SaveMode.Append)
.save(bucketName)
df = sqlContext.write
.option("spark.riakts.write.bulk-size", "500")
.format("org.apache.spark.sql.riak")
.mode(SaveMode.Append)
.save(bucketName)
Bulks will be written in parallel. The number of parallel writes for each partition is defined with the spark.riak.connections.min
property (default is 20
):
val conf = new SparkConf()
.set("spark.riakts.write.bulk-size", "500")
.set("spark.riak.connections.min", "50")
conf = pyspark.SparkConf()
conf.set("spark.riakts.write.bulk-size", "500")
conf.set("spark.riak.connections.min", "50")