Hands-On Deep Learning with Apache Spark
上QQ阅读APP看书,第一时间看更新

Spark Streaming

Spark Streaming is another Spark module that extends the core Spark API and provides a scalable, fault-tolerant, and efficient way of processing live streaming data. By converting streaming data into micro batches, Spark's simple batch programming model can be applied in streaming use cases too. This unified programming model makes it easy to combine batch and interactive data processing with streaming. Diverse sources that ingest data are supported (Kafka, Kinesis, TCP sockets, S3, or HDFS, just to mention a few of the popular ones), as well as data coming from them, and can be processed using any of the high-level functions available in Spark. Finally, the processed data can be persisted to RDBMS, NoSQL databases, HDFS, object storage systems, and so on, or consumed through live dashboards. Nothing prevents other advanced Spark components, such as MLlib or GraphX, being applied to data streams:

Figure 1.8

The following diagram shows how Spark Streaming works internally—it receives live input data streams and divides them into batches; these are processed by the Spark engine to generate the final batches of results:

Figure 1.9

The higher-level abstraction of Spark Streaming is the DStream (short for Discretized Stream), which is a wrapper around a continuous flow of data. Internally, a DStream is represented as a sequence of RDDs. A DStream contains a list of other DStreams that it depends on, a function to convert its input RDDs into output ones, and a time interval at which to invoke the function. DStreams are created by either manipulating existing ones, for example, applying a map or filter function (which internally creates MappedDStreams and FilteredDStreams, respectively), or by reading from an external source (the base class in these cases is InputDStream).

Let's implement a simple Scala example—a streaming word count self-contained application. The code used for this class can be found among the examples that are bundled with the Spark distribution. To compile and package it, you need to add the dependency to Spark Streaming to your Maven, Gradle, or sbt project descriptor, along with the dependencies from Spark Core and Scala.

First, we have to create the SparkConf and a StreamingContext (which is the main entry point for any streaming functionality) from it:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(1))

The batch interval has been set to 1 second. A DStream representing streaming data from a TCP source can be created using the ssc streaming context; we need just to specify the source hostname and port, as well as the desired storage level:

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

The returned lines DStream is the stream of data that is going to be received from the server. Each record will be a single line of text that we want to split into single words, thus specifying the space character as a separator:

val words = lines.flatMap(_.split(" "))

Then, we will count those words:

val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()

The words DStream is mapped (a one-to-one transformation) to a DStream of (word, 1) pairs, which is then reduced to get the frequency of words in each batch of data. The last command will print a few of the counts that are generated every second. Each RDD in a DStream contains data from a certain interval – any operation applied on a DStream translates to operations on the underlying RDDs:

Figure 1.10

To start the processing after all the transformations have been set up, use the following code:

ssc.start()
ssc.awaitTermination()

Before running this example, first you will need to run netcat (a small utility found in most Unix-like systems) as a data server:

nc -lk 9999

Then, in a different Terminal, you can start the example by passing the following as arguments:

localhost 9999

Any line that's typed into the Terminal and run with the netcat server will be counted and printed on the application screen every second.

Regardless of whether nc shouldn't be available in the system where you run this example, you can implement your own data server in Scala:

import java.io.DataOutputStream
import java.net.{ServerSocket, Socket}
import java.util.Scanner

object SocketWriter {
def main(args: Array[String]) {
val listener = new ServerSocket(9999)
val socket = listener.accept()

val outputStream = new DataOutputStream(socket.getOutputStream())
System.out.println("Start writing data. Enter close when finish");
val sc = new Scanner(System.in)
var str = ""
/**
* Read content from scanner and write to socket.
*/
while (!(str = sc.nextLine()).equals("close")) {
outputStream.writeUTF(str);
}
//close connection now.
outputStream.close()
listener.close()
}
}

The same self-contained application in Python could be as follows:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

DStreams support most parts of the transformations that are available for RDDs. This means that data from input DStreams can be modified in the same way as the data in RDDs. The following table lists some of the common transformations supported by Spark DStreams:

 

Windowed computations are provided by Spark Streaming. As shown in the following diagram, they allow you to apply transformations over sliding windows of data:

Figure 1.11

When a window slides over a source DStream, all its RDDs that fall within that window are taken into account and transformed to produce the RDDs of the returned windowed DStream. Looking at the specific example that's shown in the preceding diagram, the window-based operation is applied over three time units of data and it slides by two. Two parameters need to be specified by any window operation that's used:

  • Window length: The duration of the window
  • Sliding interval: The interval at which the window operation is performed

These two parameters must be multiples of the batch interval of the source DStream.

Let's see how this could be applied to the application that was presented at the beginning of this section. Suppose you want to generate a word count every 10 seconds over the last 60 seconds of data. The reduceByKey operation needs to be applied on the (word, 1) pairs of the DStream over the last 60 seconds of data. This can be achieved with the reduceByKeyAndWindow operation. When translated into Scala code, this is as follows:

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(60), Seconds(10))

For Python, it is as follows:

windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 60, 10)

The following table lists some of the common window operations supported by Spark for DStreams: