Spark 2.0 introduces a new mode of computation for Spark, Structured Streaming. This post gives a very basic fully working example of using Structured Streaming. The full code is available here on github.
First, we need a source of streaming data. Like traditional Spark streaming, Spark Structured Streaming can read from a simple socket server, so we can write one in Scala to output the stream we want:
object SocketServer extends App {
val sentences = Array(
"the cat sat on the mat",
"to be or not to be",
"what's the story morning glory"
)
val iterator = Iterator.continually(sentences).flatten
val server = new ServerSocket(9999)
val s = server.accept()
val out = new PrintStream(s.getOutputStream())
while (true) {
out.println(iterator.next())
Thread.sleep(100)
}
}
This code continuously writes the sentences to the client which connects to the socket. So, we can run a basic word count example using Spark Structured Streaming:
object StructuredStreaming extends App {
/*
This is Spark 2.0, so construct a
SparkSession rather than Context.
*/
val spark = SparkSession
.builder
.master("local[*]")
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(l => l.split(" "))
words.explain(true)
// Generate running word count
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
When we run this program with the socket program running, it will periodically output an updated word count until we terminate it:
[info] +-------+-----+
[info] | value|count|
[info] +-------+-----+
[info] | not| 37|
[info] | be| 74|
[info] | cat| 37|
[info] | story| 37|
[info] | mat| 37|
[info] | on| 37|
[info] | sat| 37|
[info] |morning| 37|
[info] | glory| 37|
[info] | the| 111|
[info] | what's| 37|
[info] | or| 37|
[info] | to| 74|
[info] +-------+-----+
...
[info] +-------+-----+
[info] | value|count|
[info] +-------+-----+
[info] | not| 48|
[info] | be| 96|
[info] | cat| 48|
[info] | story| 47|
[info] | mat| 48|
[info] | on| 48|
[info] | sat| 48|
[info] |morning| 47|
[info] | glory| 47|
[info] | the| 143|
[info] | what's| 47|
[info] | or| 48|
[info] | to| 96|
[info] +-------+-----+
Note that the counts constantly increase at each reporting interval. Because we
are doing words.groupBy("value").count()
,
we are counting all the words that
we’ve ever seen, and this count is simply being updated at each interval.