W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗(yàn)值獎(jiǎng)勵(lì)
在我們進(jìn)入如何編寫Spark Streaming程序的細(xì)節(jié)之前,讓我們快速地瀏覽一個(gè)簡單的例子。在這個(gè)例子中,程序從監(jiān)聽TCP套接字的數(shù)據(jù)服務(wù)器獲取文本數(shù)據(jù),然后計(jì)算文本中包含的單詞數(shù)。做法如下:
首先,我們導(dǎo)入Spark Streaming的相關(guān)類以及一些從StreamingContext獲得的隱式轉(zhuǎn)換到我們的環(huán)境中,為我們所需的其他類(如DStream)提供有用的方法。StreamingContext是Spark所有流操作的主要入口。然后,我們創(chuàng)建了一個(gè)具有兩個(gè)執(zhí)行線程以及1秒批間隔時(shí)間(即以秒為單位分割數(shù)據(jù)流)的本地StreamingContext。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
利用這個(gè)上下文,我們能夠創(chuàng)建一個(gè)DStream,它表示從TCP源(主機(jī)位localhost,端口為9999)獲取的流式數(shù)據(jù)。
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
這個(gè)lines
變量是一個(gè)DStream,表示即將從數(shù)據(jù)服務(wù)器獲得的流數(shù)據(jù)。這個(gè)DStream的每條記錄都代表一行文本。下一步,我們需要將DStream中的每行文本都切分為單詞。
// Split each line into words
val words = lines.flatMap(_.split(" "))
flatMap
是一個(gè)一對多的DStream操作,它通過把源DStream的每條記錄都生成多條新記錄來創(chuàng)建一個(gè)新的DStream。在這個(gè)例子中,每行文本都被切分成了多個(gè)單詞,我們把切分的單詞流用words
這個(gè)DStream表示。下一步,我們需要計(jì)算單詞的個(gè)數(shù)。
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
words
這個(gè)DStream被mapper(一對一轉(zhuǎn)換操作)成了一個(gè)新的DStream,它由(word,1)對組成。然后,我們就可以用這個(gè)新的DStream計(jì)算每批數(shù)據(jù)的詞頻。最后,我們用wordCounts.print()
打印每秒計(jì)算的詞頻。
需要注意的是,當(dāng)以上這些代碼被執(zhí)行時(shí),Spark Streaming僅僅準(zhǔn)備好了它要執(zhí)行的計(jì)算,實(shí)際上并沒有真正開始執(zhí)行。在這些轉(zhuǎn)換操作準(zhǔn)備好之后,要真正執(zhí)行計(jì)算,需要調(diào)用如下的方法
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
完整的例子可以在NetworkWordCount中找到。
如果你已經(jīng)下載和構(gòu)建了Spark環(huán)境,你就能夠用如下的方法運(yùn)行這個(gè)例子。首先,你需要運(yùn)行Netcat作為數(shù)據(jù)服務(wù)器
$ nc -lk 9999
然后,在不同的終端,你能夠用如下方式運(yùn)行例子
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
Copyright©2021 w3cschool編程獅|閩ICP備15016281號(hào)-3|閩公網(wǎng)安備35020302033924號(hào)
違法和不良信息舉報(bào)電話:173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號(hào)
聯(lián)系方式:
更多建議: