Apache Spark is a powerful tool for distributed
processing of data. Data, whether represented as RDDs
or as DataFrames
is processed using transformations (such as map
and filter
) which are performed
lazily, meaning that they are only executed when the results are needed for some output.
In some cases, a particular RDD or DataFrame may be used as the basis for
multiple different downstream computations, or a computation performed multiple times.
In these cases, it may be useful to cache the data of an RDD which is used more than once, to prevent it being computed multiple times.
In this post, the procedure of caching data is explored, along with the limitations and options that apply to caching.
Environment setup
Similar examples to these could be run on any Spark cluster, but the caching results may differ depending on the version of Spark used, and the resources assigned to the Spark components.
These examples were run using Spark 1.5.2, specifically the binary distribution built against Hadoop 2.6.
The following spark-defaults.conf
was used:
spark.driver.memory 1g
spark.executor.memory 1g
spark.master spark://<your machine name>:7077
The following spark-env.sh
was used:
export SPARK_WORKER_MEMORY=1g
export SPARK_WORKER_CORES=1
export SPARK_DRIVER_MEMORY=1g
The environment variable SPARK_HOME
is set to the location that the spark
download archive was extracted to. Spark is started with:
$SPARK_HOME/sbin/start-master.sh
$SPARK_HOME/sbin/start-slave.sh spark://<your machine name>:7077
You should be able to navigate to http://localhost:8080/
to see
the Web UI for the Spark Master.
Examples
The interactive Spark REPL can be started by running $SPARK_HOME/bin/spark-shell
.
A small RDD can be created and cached in the REPL with:
val smallRdd = sc.parallelize(List("a", "b", "c")).setName("small RDD")
smallRdd.cache().foreach(_ => {})
The foreach
is necessary since even caching is performed lazily. An RDD
is cached the first time it is used, then that cached data is available to any
subsequent usages. Using foreach
in this manner avoids bringing any data to
the driver, which can be costly in terms of performance.
After running this example in the spark shell (leaving the REPL open to keep
the application alive), you should be able to navigate to the Application’s
Web UI from the Master Web UI. The Application UI will probably be at
http://localhost:4040
.
On the Storage tab, you should see an entry such as:
RDD Name | Storage Level | Cached Partitions | Fraction Cached | Size in Memory | Size in ExternalBlockStore | Size on Disk |
---|---|---|---|---|---|---|
small RDD | Memory Deserialized 1x Replicated | 2 | 100% | 192.0 B | 0.0 B | 0.0 B |
Calling cache()
on an RDD
is simply a special case of calling the persist()
method with a storage type of MEMORY_ONLY
. Another option is MEMORY_ONLY_SER
which serialized the contents of the RDD into byte buffers rather than caching the
Java objects themselves. This generally leads to significantly smaller memory usage.
For example:
import org.apache.spark.storage.StorageLevel
val mediumRdd = sc.parallelize(List(1 to 5000)).
map(i => s"Some data $i").
setName("med RDD")
mediumRdd.persist(StorageLevel.MEMORY_ONLY).
foreach(_ => {})
val mediumRddSer = sc.parallelize(List(1 to 5000)).
map(i => s"Some data $i").
setName("med RDD Serialized")
mediumRddSer.persist(StorageLevel.MEMORY_ONLY_SER).
foreach(_ => {})
The Storage tab for application UI should show entries such as:
RDD Name | Storage Level | Cached Partitions | Fraction Cached | Size in Memory | Size in ExternalBlockStore | Size on Disk |
---|---|---|---|---|---|---|
med RDD | Memory Deserialized 1x Replicated | 2 | 100% | 4.9 KB | 0.0 B | 0.0 B |
med RDD Serialized | Memory Serialized 1x Replicated | 2 | 100% | 2.4 KB | 0.0 B | 0.0 B |
This difference in cached size can make the difference between data fitting in cache or not, which could have significant performance implications.
In some cases Spark may not have sufficient memory to cache the entire RDD, but still be able to cache some of the partitions.
For example:
val largeRdd =sc.parallelize(0 to 7999).
flatMap(i => (0 to 999).
map(x => i*1000 + x)).
map(i => s"data value of $i")
largeRdd.setName("large RDD").persist(StorageLevel.MEMORY_ONLY).foreach(_ => {})
This results in the following entry in the Storage tab:
RDD Name | Storage Level | Cached Partitions | Fraction Cached | Size in Memory | Size in ExternalBlockStore | Size on Disk |
---|---|---|---|---|---|---|
large RDD | Memory Deserialized 1x Replicated | 1 | 50% | 341.15 MB | 0.0 B | 0.0 B |
In other cases, there may not be enough memory to cache any partitions. This will not result in an error or Exception from Spark, but the data will not be cached and subsequent executions using the RDD will need to recompute it.
Using foreach in Python
When using pyspark
, caching works in the same way, but calling foreach requires
a function, which we want to perform no operations. One way is to define a no_op
function:
Launch pyspark
with
$SPARK_HOME/bin/pyspark
Then execute the following:
def no_op(x):
pass
sc.parallelize(['a', 'b', 'c']).cache().foreach(no_op)