DStream的輸出操作

2018-02-24 15:58 更新

DStreams上的輸出操作

輸出操作允許DStream的操作推到如數(shù)據(jù)庫、文件系統(tǒng)等外部系統(tǒng)中。因為輸出操作實際上是允許外部系統(tǒng)消費轉(zhuǎn)換后的數(shù)據(jù),它們觸發(fā)的實際操作是DStream轉(zhuǎn)換。目前,定義了下面幾種輸出操作:

Output Operation Meaning
print() 在DStream的每個批數(shù)據(jù)中打印前10條元素,這個操作在開發(fā)和調(diào)試中都非常有用。在Python API中調(diào)用pprint()
saveAsObjectFiles(prefix, [suffix]) 保存DStream的內(nèi)容為一個序列化的文件SequenceFile。每一個批間隔的文件的文件名基于prefixsuffix生成。"prefix-TIME_IN_MS[.suffix]",在Python API中不可用。
saveAsTextFiles(prefix, [suffix]) 保存DStream的內(nèi)容為一個文本文件。每一個批間隔的文件的文件名基于prefixsuffix生成。"prefix-TIME_IN_MS[.suffix]"
saveAsHadoopFiles(prefix, [suffix]) 保存DStream的內(nèi)容為一個hadoop文件。每一個批間隔的文件的文件名基于prefixsuffix生成。"prefix-TIME_IN_MS[.suffix]",在Python API中不可用。
foreachRDD(func) 在從流中生成的每個RDD上應(yīng)用函數(shù)func的最通用的輸出操作。這個函數(shù)應(yīng)該推送每個RDD的數(shù)據(jù)到外部系統(tǒng),例如保存RDD到文件或者通過網(wǎng)絡(luò)寫到數(shù)據(jù)庫中。需要注意的是,func函數(shù)在驅(qū)動程序中執(zhí)行,并且通常都有RDD action在里面推動RDD流的計算。

利用foreachRDD的設(shè)計模式

dstream.foreachRDD是一個強大的原語,發(fā)送數(shù)據(jù)到外部系統(tǒng)中。然而,明白怎樣正確地、有效地用這個原語是非常重要的。下面幾點介紹了如何避免一般錯誤。

  • 經(jīng)常寫數(shù)據(jù)到外部系統(tǒng)需要建一個連接對象(例如到遠(yuǎn)程服務(wù)器的TCP連接),用它發(fā)送數(shù)據(jù)到遠(yuǎn)程系統(tǒng)。為了達(dá)到這個目的,開發(fā)人員可能不經(jīng)意的在Spark驅(qū)動中創(chuàng)建一個連接對象,但是在Spark worker中嘗試調(diào)用這個連接對象保存記錄到RDD中,如下:
  dstream.foreachRDD(rdd => {
      val connection = createNewConnection()  // executed at the driver
      rdd.foreach(record => {
          connection.send(record) // executed at the worker
      })
  })

這是不正確的,因為這需要先序列化連接對象,然后將它從driver發(fā)送到worker中。這樣的連接對象在機(jī)器之間不能傳送。它可能表現(xiàn)為序列化錯誤(連接對象不可序列化)或者初始化錯誤(連接對象應(yīng)該在worker中初始化)等等。正確的解決辦法是在worker中創(chuàng)建連接對象。

  • 然而,這會造成另外一個常見的錯誤-為每一個記錄創(chuàng)建了一個連接對象。例如:
  dstream.foreachRDD(rdd => {
      rdd.foreach(record => {
          val connection = createNewConnection()
          connection.send(record)
          connection.close()
      })
  })

通常,創(chuàng)建一個連接對象有資源和時間的開支。因此,為每個記錄創(chuàng)建和銷毀連接對象會導(dǎo)致非常高的開支,明顯的減少系統(tǒng)的整體吞吐量。一個更好的解決辦法是利用rdd.foreachPartition方法。為RDD的partition創(chuàng)建一個連接對象,用這個兩件對象發(fā)送partition中的所有記錄。

 dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          val connection = createNewConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          connection.close()
      })
  })

這就將連接對象的創(chuàng)建開銷分?jǐn)偟搅藀artition的所有記錄上了。

  • 最后,可以通過在多個RDD或者批數(shù)據(jù)間重用連接對象做更進(jìn)一步的優(yōu)化。開發(fā)者可以保有一個靜態(tài)的連接對象池,重復(fù)使用池中的對象將多批次的RDD推送到外部系統(tǒng),以進(jìn)一步節(jié)省開支。
  dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          // ConnectionPool is a static, lazily initialized pool of connections
          val connection = ConnectionPool.getConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      })
  })

需要注意的是,池中的連接對象應(yīng)該根據(jù)需要延遲創(chuàng)建,并且在空閑一段時間后自動超時。這樣就獲取了最有效的方式發(fā)生數(shù)據(jù)到外部系統(tǒng)。

其它需要注意的地方:

  • 輸出操作通過懶執(zhí)行的方式操作DStreams,正如RDD action通過懶執(zhí)行的方式操作RDD。具體地看,RDD actions和DStreams輸出操作接收數(shù)據(jù)的處理。因此,如果你的應(yīng)用程序沒有任何輸出操作或者用于輸出操作dstream.foreachRDD(),但是沒有任何RDD action操作在dstream.foreachRDD()里面,那么什么也不會執(zhí)行。系統(tǒng)僅僅會接收輸入,然后丟棄它們。
  • 默認(rèn)情況下,DStreams輸出操作是分時執(zhí)行的,它們按照應(yīng)用程序的定義順序按序執(zhí)行。
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號