輸入DStreams

2018-02-24 15:57 更新

輸入DStreams和receivers

輸入DStreams表示從數(shù)據(jù)源獲取輸入數(shù)據(jù)流的DStreams。在快速例子中,lines表示輸入DStream,它代表從netcat服務器獲取的數(shù)據(jù)流。每一個輸入流DStream和一個Receiver對象相關聯(lián),這個Receiver從源中獲取數(shù)據(jù),并將數(shù)據(jù)存入內存中用于處理。

輸入DStreams表示從數(shù)據(jù)源獲取的原始數(shù)據(jù)流。Spark Streaming擁有兩類數(shù)據(jù)源

  • 基本源(Basic sources):這些源在StreamingContext API中直接可用。例如文件系統(tǒng)、套接字連接、Akka的actor等。
  • 高級源(Advanced sources):這些源包括Kafka,Flume,Kinesis,Twitter等等。它們需要通過額外的類來使用。我們在關聯(lián)那一節(jié)討論了類依賴。

需要注意的是,如果你想在一個流應用中并行地創(chuàng)建多個輸入DStream來接收多個數(shù)據(jù)流,你能夠創(chuàng)建多個輸入流(這將在性能調優(yōu)那一節(jié)介紹)。它將創(chuàng)建多個Receiver同時接收多個數(shù)據(jù)流。但是,receiver作為一個長期運行的任務運行在Spark worker或executor中。因此,它占有一個核,這個核是分配給Spark Streaming應用程序的所有核中的一個(it occupies one of the cores allocated to the Spark Streaming application)。所以,為Spark Streaming應用程序分配足夠的核(如果是本地運行,那么是線程)用以處理接收的數(shù)據(jù)并且運行receiver是非常重要的。

幾點需要注意的地方:

  • 如果分配給應用程序的核的數(shù)量少于或者等于輸入DStreams或者receivers的數(shù)量,系統(tǒng)只能夠接收數(shù)據(jù)而不能處理它們。
  • 當運行在本地,如果你的master URL被設置成了“l(fā)ocal”,這樣就只有一個核運行任務。這對程序來說是不足的,因為作為receiver的輸入DStream將會占用這個核,這樣就沒有剩余的核來處理數(shù)據(jù)了。

基本源

我們已經在快速例子中看到,ssc.socketTextStream(...)方法用來把從TCP套接字獲取的文本數(shù)據(jù)創(chuàng)建成DStream。除了套接字,StreamingContext API也支持把文件以及Akka actors作為輸入源創(chuàng)建DStream。

  • 文件流(File Streams):從任何與HDFS API兼容的文件系統(tǒng)中讀取數(shù)據(jù),一個DStream可以通過如下方式創(chuàng)建
streamingContext.fileStream[keyClass, valueClass, inputFormatClass](dataDirectory)

Spark Streaming將會監(jiān)控dataDirectory目錄,并且處理目錄下生成的任何文件(嵌套目錄不被支持)。需要注意一下三點:

1 所有文件必須具有相同的數(shù)據(jù)格式
2 所有文件必須在`dataDirectory`目錄下創(chuàng)建,文件是自動的移動和重命名到數(shù)據(jù)目錄下
3 一旦移動,文件必須被修改。所以如果文件被持續(xù)的附加數(shù)據(jù),新的數(shù)據(jù)不會被讀取。

對于簡單的文本文件,有一個更簡單的方法streamingContext.textFileStream(dataDirectory)可以被調用。文件流不需要運行一個receiver,所以不需要分配核。

在Spark1.2中,fileStream在Python API中不可用,只有textFileStream可用。

  • 基于自定義actor的流:DStream可以調用streamingContext.actorStream(actorProps, actor-name)方法從Akka actors獲取的數(shù)據(jù)流來創(chuàng)建。具體的信息見自定義receiver指南actorStream在Python API中不可用。
  • RDD隊列作為數(shù)據(jù)流:為了用測試數(shù)據(jù)測試Spark Streaming應用程序,人們也可以調用streamingContext.queueStream(queueOfRDDs)方法基于RDD隊列創(chuàng)建DStreams。每個push到隊列的RDD都被當做DStream的批數(shù)據(jù),像流一樣處理。

關于從套接字、文件和actor中獲取流的更多細節(jié),請看StreamingContextJavaStreamingContext

高級源

這類源需要非Spark庫接口,并且它們中的部分還需要復雜的依賴(例如kafka和flume)。為了減少依賴的版本沖突問題,從這些源創(chuàng)建DStream的功能已經被移到了獨立的庫中,你能在關聯(lián)查看細節(jié)。例如,如果你想用來自推特的流數(shù)據(jù)創(chuàng)建DStream,你需要按照如下步驟操作:

  • 關聯(lián):添加spark-streaming-twitter_2.10到SBT或maven項目的依賴中
  • 編寫:導入TwitterUtils類,用TwitterUtils.createStream方法創(chuàng)建DStream,如下所示

    import org.apache.spark.streaming.twitter._
    TwitterUtils.createStream(ssc)
  • 部署:將編寫的程序以及其所有的依賴(包括spark-streaming-twitter_2.10的依賴以及它的傳遞依賴)打為jar包,然后部署。這在部署章節(jié)將會作更進一步的介紹。

需要注意的是,這些高級的源在spark-shell中不能被使用,因此基于這些源的應用程序無法在shell中測試。

下面將介紹部分的高級源:

  • Twitter:Spark Streaming利用Twitter4j 3.0.3獲取公共的推文流,這些推文通過推特流API獲得。認證信息可以通過Twitter4J庫支持的任何方法提供。你既能夠得到公共流,也能夠得到基于關鍵字過濾后的流。你可以查看API文檔(scalajava)和例子(TwitterPopularTagsTwitterAlgebirdCMS
  • Flume:Spark Streaming 1.2能夠從flume 1.4.0中獲取數(shù)據(jù),可以查看flume集成指南了解詳細信息
  • Kafka:Spark Streaming 1.2能夠從kafka 0.8.0中獲取數(shù)據(jù),可以查看kafka集成指南了解詳細信息
  • Kinesis:查看Kinesis集成指南了解詳細信息

自定義源

在Spark 1.2中,這些源不被Python API支持。輸入DStream也可以通過自定義源創(chuàng)建,你需要做的是實現(xiàn)用戶自定義的receiver,這個receiver可以從自定義源接收數(shù)據(jù)以及將數(shù)據(jù)推到Spark中。通過自定義receiver指南了解詳細信息

Receiver可靠性

基于可靠性有兩類數(shù)據(jù)源。源(如kafka、flume)允許。如果從這些可靠的源獲取數(shù)據(jù)的系統(tǒng)能夠正確的應答所接收的數(shù)據(jù),它就能夠確保在任何情況下不丟失數(shù)據(jù)。這樣,就有兩種類型的receiver:

  • Reliable Receiver:一個可靠的receiver正確的應答一個可靠的源,數(shù)據(jù)已經收到并且被正確地復制到了Spark中。
  • Unreliable Receiver :這些receivers不支持應答。即使對于一個可靠的源,開發(fā)者可能實現(xiàn)一個非可靠的receiver,這個receiver不會正確應答。

怎樣編寫可靠的Receiver的細節(jié)在自定義receiver中有詳細介紹。

以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號