Spark Streaming示例

2018-11-26 16:31 更新

一個(gè)快速的Spark Streaming例子

在我們進(jìn)入如何編寫Spark Streaming程序的細(xì)節(jié)之前,讓我們快速地瀏覽一個(gè)簡單的例子。在這個(gè)例子中,程序從監(jiān)聽TCP套接字的數(shù)據(jù)服務(wù)器獲取文本數(shù)據(jù),然后計(jì)算文本中包含的單詞數(shù)。做法如下:

首先,我們導(dǎo)入Spark Streaming的相關(guān)類以及一些從StreamingContext獲得的隱式轉(zhuǎn)換到我們的環(huán)境中,為我們所需的其他類(如DStream)提供有用的方法。StreamingContext是Spark所有流操作的主要入口。然后,我們創(chuàng)建了一個(gè)具有兩個(gè)執(zhí)行線程以及1秒批間隔時(shí)間(即以秒為單位分割數(shù)據(jù)流)的本地StreamingContext。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

利用這個(gè)上下文,我們能夠創(chuàng)建一個(gè)DStream,它表示從TCP源(主機(jī)位localhost,端口為9999)獲取的流式數(shù)據(jù)。

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

這個(gè)lines變量是一個(gè)DStream,表示即將從數(shù)據(jù)服務(wù)器獲得的流數(shù)據(jù)。這個(gè)DStream的每條記錄都代表一行文本。下一步,我們需要將DStream中的每行文本都切分為單詞。

// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMap是一個(gè)一對多的DStream操作,它通過把源DStream的每條記錄都生成多條新記錄來創(chuàng)建一個(gè)新的DStream。在這個(gè)例子中,每行文本都被切分成了多個(gè)單詞,我們把切分的單詞流用words這個(gè)DStream表示。下一步,我們需要計(jì)算單詞的個(gè)數(shù)。

import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

words這個(gè)DStream被mapper(一對一轉(zhuǎn)換操作)成了一個(gè)新的DStream,它由(word,1)對組成。然后,我們就可以用這個(gè)新的DStream計(jì)算每批數(shù)據(jù)的詞頻。最后,我們用wordCounts.print()打印每秒計(jì)算的詞頻。

需要注意的是,當(dāng)以上這些代碼被執(zhí)行時(shí),Spark Streaming僅僅準(zhǔn)備好了它要執(zhí)行的計(jì)算,實(shí)際上并沒有真正開始執(zhí)行。在這些轉(zhuǎn)換操作準(zhǔn)備好之后,要真正執(zhí)行計(jì)算,需要調(diào)用如下的方法

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

完整的例子可以在NetworkWordCount中找到。

如果你已經(jīng)下載和構(gòu)建了Spark環(huán)境,你就能夠用如下的方法運(yùn)行這個(gè)例子。首先,你需要運(yùn)行Netcat作為數(shù)據(jù)服務(wù)器

$ nc -lk 9999

然后,在不同的終端,你能夠用如下方式運(yùn)行例子

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)