輸入DStreams表示從數(shù)據(jù)源獲取輸入數(shù)據(jù)流的DStreams。在快速例子中,lines
表示輸入DStream,它代表從netcat服務器獲取的數(shù)據(jù)流。每一個輸入流DStream和一個Receiver
對象相關聯(lián),這個Receiver
從源中獲取數(shù)據(jù),并將數(shù)據(jù)存入內存中用于處理。
輸入DStreams表示從數(shù)據(jù)源獲取的原始數(shù)據(jù)流。Spark Streaming擁有兩類數(shù)據(jù)源
需要注意的是,如果你想在一個流應用中并行地創(chuàng)建多個輸入DStream來接收多個數(shù)據(jù)流,你能夠創(chuàng)建多個輸入流(這將在性能調優(yōu)那一節(jié)介紹)。它將創(chuàng)建多個Receiver同時接收多個數(shù)據(jù)流。但是,receiver
作為一個長期運行的任務運行在Spark worker或executor中。因此,它占有一個核,這個核是分配給Spark Streaming應用程序的所有核中的一個(it occupies one of the cores allocated to the Spark Streaming application)。所以,為Spark Streaming應用程序分配足夠的核(如果是本地運行,那么是線程)用以處理接收的數(shù)據(jù)并且運行receiver
是非常重要的。
幾點需要注意的地方:
receiver
的輸入DStream將會占用這個核,這樣就沒有剩余的核來處理數(shù)據(jù)了。我們已經在快速例子中看到,ssc.socketTextStream(...)
方法用來把從TCP套接字獲取的文本數(shù)據(jù)創(chuàng)建成DStream。除了套接字,StreamingContext API也支持把文件以及Akka actors作為輸入源創(chuàng)建DStream。
streamingContext.fileStream[keyClass, valueClass, inputFormatClass](dataDirectory)
Spark Streaming將會監(jiān)控dataDirectory
目錄,并且處理目錄下生成的任何文件(嵌套目錄不被支持)。需要注意一下三點:
1 所有文件必須具有相同的數(shù)據(jù)格式
2 所有文件必須在`dataDirectory`目錄下創(chuàng)建,文件是自動的移動和重命名到數(shù)據(jù)目錄下
3 一旦移動,文件必須被修改。所以如果文件被持續(xù)的附加數(shù)據(jù),新的數(shù)據(jù)不會被讀取。
對于簡單的文本文件,有一個更簡單的方法streamingContext.textFileStream(dataDirectory)
可以被調用。文件流不需要運行一個receiver,所以不需要分配核。
在Spark1.2中,fileStream
在Python API中不可用,只有textFileStream
可用。
streamingContext.actorStream(actorProps, actor-name)
方法從Akka actors獲取的數(shù)據(jù)流來創(chuàng)建。具體的信息見自定義receiver指南actorStream
在Python API中不可用。streamingContext.queueStream(queueOfRDDs)
方法基于RDD隊列創(chuàng)建DStreams。每個push到隊列的RDD都被當做DStream的批數(shù)據(jù),像流一樣處理。關于從套接字、文件和actor中獲取流的更多細節(jié),請看StreamingContext和JavaStreamingContext
這類源需要非Spark庫接口,并且它們中的部分還需要復雜的依賴(例如kafka和flume)。為了減少依賴的版本沖突問題,從這些源創(chuàng)建DStream的功能已經被移到了獨立的庫中,你能在關聯(lián)查看細節(jié)。例如,如果你想用來自推特的流數(shù)據(jù)創(chuàng)建DStream,你需要按照如下步驟操作:
spark-streaming-twitter_2.10
到SBT或maven項目的依賴中編寫:導入TwitterUtils
類,用TwitterUtils.createStream
方法創(chuàng)建DStream,如下所示
import org.apache.spark.streaming.twitter._
TwitterUtils.createStream(ssc)
需要注意的是,這些高級的源在spark-shell
中不能被使用,因此基于這些源的應用程序無法在shell中測試。
下面將介紹部分的高級源:
Twitter4j 3.0.3
獲取公共的推文流,這些推文通過推特流API獲得。認證信息可以通過Twitter4J庫支持的任何方法提供。你既能夠得到公共流,也能夠得到基于關鍵字過濾后的流。你可以查看API文檔(scala和java)和例子(TwitterPopularTags和TwitterAlgebirdCMS)在Spark 1.2中,這些源不被Python API支持。輸入DStream也可以通過自定義源創(chuàng)建,你需要做的是實現(xiàn)用戶自定義的receiver
,這個receiver
可以從自定義源接收數(shù)據(jù)以及將數(shù)據(jù)推到Spark中。通過自定義receiver指南了解詳細信息
基于可靠性有兩類數(shù)據(jù)源。源(如kafka、flume)允許。如果從這些可靠的源獲取數(shù)據(jù)的系統(tǒng)能夠正確的應答所接收的數(shù)據(jù),它就能夠確保在任何情況下不丟失數(shù)據(jù)。這樣,就有兩種類型的receiver:
怎樣編寫可靠的Receiver的細節(jié)在自定義receiver中有詳細介紹。
更多建議: