Apache Spark is a powerful tool for distributed processing of data. Data, whether represented as RDDs or as DataFrames is processed using transformations (such as map and filter) which are performed lazily, meaning that they are only executed when the results are needed for some output. In some cases, a particular RDD or DataFrame may be used as the basis for multiple different downstream computations, or a computation performed multiple times.

In these cases, it may be useful to cache the data of an RDD which is used more than once, to prevent it being computed multiple times.

In this post, the procedure of caching data is explored, along with the limitations and options that apply to caching.

Environment setup

Similar examples to these could be run on any Spark cluster, but the caching results may differ depending on the version of Spark used, and the resources assigned to the Spark components.

These examples were run using Spark 1.5.2, specifically the binary distribution built against Hadoop 2.6.

The following spark-defaults.conf was used:

spark.driver.memory 1g
spark.executor.memory 1g
spark.master spark://<your machine name>:7077

The following spark-env.sh was used:

export SPARK_WORKER_MEMORY=1g
export SPARK_WORKER_CORES=1
export SPARK_DRIVER_MEMORY=1g

The environment variable SPARK_HOME is set to the location that the spark download archive was extracted to. Spark is started with:

$SPARK_HOME/sbin/start-master.sh

$SPARK_HOME/sbin/start-slave.sh spark://<your machine name>:7077

You should be able to navigate to http://localhost:8080/ to see the Web UI for the Spark Master.

Examples

The interactive Spark REPL can be started by running $SPARK_HOME/bin/spark-shell.

A small RDD can be created and cached in the REPL with:

val smallRdd = sc.parallelize(List("a", "b", "c")).setName("small RDD")
smallRdd.cache().foreach(_ => {})

The foreach is necessary since even caching is performed lazily. An RDD is cached the first time it is used, then that cached data is available to any subsequent usages. Using foreach in this manner avoids bringing any data to the driver, which can be costly in terms of performance.

After running this example in the spark shell (leaving the REPL open to keep the application alive), you should be able to navigate to the Application’s Web UI from the Master Web UI. The Application UI will probably be at http://localhost:4040.

On the Storage tab, you should see an entry such as:

RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in ExternalBlockStore Size on Disk
small RDD Memory Deserialized 1x Replicated 2 100% 192.0 B 0.0 B 0.0 B

Calling cache() on an RDD is simply a special case of calling the persist() method with a storage type of MEMORY_ONLY. Another option is MEMORY_ONLY_SER which serialized the contents of the RDD into byte buffers rather than caching the Java objects themselves. This generally leads to significantly smaller memory usage.

For example:

import org.apache.spark.storage.StorageLevel

val mediumRdd = sc.parallelize(List(1 to 5000)).
  map(i => s"Some data $i").
  setName("med RDD")

mediumRdd.persist(StorageLevel.MEMORY_ONLY).
foreach(_ => {})

val mediumRddSer = sc.parallelize(List(1 to 5000)).
  map(i => s"Some data $i").
  setName("med RDD Serialized")

mediumRddSer.persist(StorageLevel.MEMORY_ONLY_SER).
foreach(_ => {})

The Storage tab for application UI should show entries such as:

RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in ExternalBlockStore Size on Disk
med RDD Memory Deserialized 1x Replicated 2 100% 4.9 KB 0.0 B 0.0 B
med RDD Serialized Memory Serialized 1x Replicated 2 100% 2.4 KB 0.0 B 0.0 B

This difference in cached size can make the difference between data fitting in cache or not, which could have significant performance implications.

In some cases Spark may not have sufficient memory to cache the entire RDD, but still be able to cache some of the partitions.

For example:

val largeRdd =sc.parallelize(0 to 7999).
  flatMap(i => (0 to 999).
  map(x => i*1000 + x)).
  map(i => s"data value of $i")

largeRdd.setName("large RDD").persist(StorageLevel.MEMORY_ONLY).foreach(_ => {})

This results in the following entry in the Storage tab:

RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in ExternalBlockStore Size on Disk
large RDD Memory Deserialized 1x Replicated 1 50% 341.15 MB 0.0 B 0.0 B

In other cases, there may not be enough memory to cache any partitions. This will not result in an error or Exception from Spark, but the data will not be cached and subsequent executions using the RDD will need to recompute it.

Using foreach in Python

When using pyspark, caching works in the same way, but calling foreach requires a function, which we want to perform no operations. One way is to define a no_op function:

Launch pyspark with

$SPARK_HOME/bin/pyspark

Then execute the following:

def no_op(x):
    pass

sc.parallelize(['a', 'b', 'c']).cache().foreach(no_op)