Spark Streaming Checkpointing

2018-11-26 16:38 更新

Spark Streaming Checkpointing

一個(gè)流應(yīng)用程序必須全天候運(yùn)行,所有必須能夠解決應(yīng)用程序邏輯無(wú)關(guān)的故障(如系統(tǒng)錯(cuò)誤,JVM崩潰等)。為了使這成為可能,Spark Streaming需要checkpoint足夠的信息到容錯(cuò)存儲(chǔ)系統(tǒng)中,以使系統(tǒng)從故障中恢復(fù)。

  • Metadata checkpointing:保存流計(jì)算的定義信息到容錯(cuò)存儲(chǔ)系統(tǒng)如HDFS中。這用來(lái)恢復(fù)應(yīng)用程序中運(yùn)行worker的節(jié)點(diǎn)的故障。元數(shù)據(jù)包括

  • Configuration :創(chuàng)建Spark Streaming應(yīng)用程序的配置信息
  • DStream operations :定義Streaming應(yīng)用程序的操作集合
  • Incomplete batches:操作存在隊(duì)列中的未完成的批

  • Data checkpointing :保存生成的RDD到可靠的存儲(chǔ)系統(tǒng)中,這在有狀態(tài)transformation(如結(jié)合跨多個(gè)批次的數(shù)據(jù))中是必須的。在這樣一個(gè)transformation中,生成的RDD依賴于之前批的RDD,隨著時(shí)間的推移,這個(gè)依賴鏈的長(zhǎng)度會(huì)持續(xù)增長(zhǎng)。在恢復(fù)的過(guò)程中,為了避免這種無(wú)限增長(zhǎng)。有狀態(tài)的transformation的中間RDD將會(huì)定時(shí)地存儲(chǔ)到可靠存儲(chǔ)系統(tǒng)中,以截?cái)噙@個(gè)依賴鏈。

元數(shù)據(jù)checkpoint主要是為了從driver故障中恢復(fù)數(shù)據(jù)。如果transformation操作被用到了,數(shù)據(jù)checkpoint即使在簡(jiǎn)單的操作中都是必須的。

何時(shí)checkpoint

應(yīng)用程序在下面兩種情況下必須開(kāi)啟checkpoint

  • 使用有狀態(tài)的transformation。如果在應(yīng)用程序中用到了updateStateByKey或者reduceByKeyAndWindow,checkpoint目錄必需提供用以定期checkpoint RDD。
  • 從運(yùn)行應(yīng)用程序的driver的故障中恢復(fù)過(guò)來(lái)。使用元數(shù)據(jù)checkpoint恢復(fù)處理信息。

注意,沒(méi)有前述的有狀態(tài)的transformation的簡(jiǎn)單流應(yīng)用程序在運(yùn)行時(shí)可以不開(kāi)啟checkpoint。在這種情況下,從driver故障的恢復(fù)將是部分恢復(fù)(接收到了但是還沒(méi)有處理的數(shù)據(jù)將會(huì)丟失)。這通常是可以接受的,許多運(yùn)行的Spark Streaming應(yīng)用程序都是這種方式。

怎樣配置Checkpointing

在容錯(cuò)、可靠的文件系統(tǒng)(HDFS、s3等)中設(shè)置一個(gè)目錄用于保存checkpoint信息。著可以通過(guò)streamingContext.checkpoint(checkpointDirectory)方法來(lái)做。這運(yùn)行你用之前介紹的有狀態(tài)transformation。另外,如果你想從driver故障中恢復(fù),你應(yīng)該以下面的方式重寫(xiě)你的Streaming應(yīng)用程序。

  • 當(dāng)應(yīng)用程序是第一次啟動(dòng),新建一個(gè)StreamingContext,啟動(dòng)所有Stream,然后調(diào)用start()方法
  • 當(dāng)應(yīng)用程序因?yàn)楣收现匦聠?dòng),它將會(huì)從checkpoint目錄checkpoint數(shù)據(jù)重新創(chuàng)建StreamingContext
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果checkpointDirectory存在,上下文將會(huì)利用checkpoint數(shù)據(jù)重新創(chuàng)建。如果這個(gè)目錄不存在,將會(huì)調(diào)用functionToCreateContext函數(shù)創(chuàng)建一個(gè)新的上下文,建立DStreams。請(qǐng)看RecoverableNetworkWordCount例子。

除了使用getOrCreate,開(kāi)發(fā)者必須保證在故障發(fā)生時(shí),driver處理自動(dòng)重啟。只能通過(guò)部署運(yùn)行應(yīng)用程序的基礎(chǔ)設(shè)施來(lái)達(dá)到該目的。在部署章節(jié)將有更進(jìn)一步的討論。

注意,RDD的checkpointing有存儲(chǔ)成本。這會(huì)導(dǎo)致批數(shù)據(jù)(包含的RDD被checkpoint)的處理時(shí)間增加。因此,需要小心的設(shè)置批處理的時(shí)間間隔。在最小的批容量(包含1秒的數(shù)據(jù))情況下,checkpoint每批數(shù)據(jù)會(huì)顯著的減少操作的吞吐量。相反,checkpointing太少會(huì)導(dǎo)致譜系以及任務(wù)大小增大,這會(huì)產(chǎn)生有害的影響。因?yàn)橛袪顟B(tài)的transformation需要RDD checkpoint。默認(rèn)的間隔時(shí)間是批間隔時(shí)間的倍數(shù),最少10秒。它可以通過(guò)dstream.checkpoint來(lái)設(shè)置。典型的情況下,設(shè)置checkpoint間隔是DStream的滑動(dòng)間隔的5-10大小是一個(gè)好的嘗試。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)