W3Cschool
恭喜您成為首批注冊(cè)用戶
獲得88經(jīng)驗(yàn)值獎(jiǎng)勵(lì)
一個(gè)流應(yīng)用程序必須全天候運(yùn)行,所有必須能夠解決應(yīng)用程序邏輯無(wú)關(guān)的故障(如系統(tǒng)錯(cuò)誤,JVM崩潰等)。為了使這成為可能,Spark Streaming需要checkpoint足夠的信息到容錯(cuò)存儲(chǔ)系統(tǒng)中,以使系統(tǒng)從故障中恢復(fù)。
Metadata checkpointing:保存流計(jì)算的定義信息到容錯(cuò)存儲(chǔ)系統(tǒng)如HDFS中。這用來(lái)恢復(fù)應(yīng)用程序中運(yùn)行worker的節(jié)點(diǎn)的故障。元數(shù)據(jù)包括
Incomplete batches:操作存在隊(duì)列中的未完成的批
元數(shù)據(jù)checkpoint主要是為了從driver故障中恢復(fù)數(shù)據(jù)。如果transformation操作被用到了,數(shù)據(jù)checkpoint即使在簡(jiǎn)單的操作中都是必須的。
應(yīng)用程序在下面兩種情況下必須開(kāi)啟checkpoint
updateStateByKey
或者reduceByKeyAndWindow
,checkpoint目錄必需提供用以定期checkpoint RDD。注意,沒(méi)有前述的有狀態(tài)的transformation的簡(jiǎn)單流應(yīng)用程序在運(yùn)行時(shí)可以不開(kāi)啟checkpoint。在這種情況下,從driver故障的恢復(fù)將是部分恢復(fù)(接收到了但是還沒(méi)有處理的數(shù)據(jù)將會(huì)丟失)。這通常是可以接受的,許多運(yùn)行的Spark Streaming應(yīng)用程序都是這種方式。
在容錯(cuò)、可靠的文件系統(tǒng)(HDFS、s3等)中設(shè)置一個(gè)目錄用于保存checkpoint信息。著可以通過(guò)streamingContext.checkpoint(checkpointDirectory)
方法來(lái)做。這運(yùn)行你用之前介紹的有狀態(tài)transformation。另外,如果你想從driver故障中恢復(fù),你應(yīng)該以下面的方式重寫(xiě)你的Streaming應(yīng)用程序。
start()
方法// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如果checkpointDirectory
存在,上下文將會(huì)利用checkpoint數(shù)據(jù)重新創(chuàng)建。如果這個(gè)目錄不存在,將會(huì)調(diào)用functionToCreateContext
函數(shù)創(chuàng)建一個(gè)新的上下文,建立DStreams。請(qǐng)看RecoverableNetworkWordCount例子。
除了使用getOrCreate
,開(kāi)發(fā)者必須保證在故障發(fā)生時(shí),driver處理自動(dòng)重啟。只能通過(guò)部署運(yùn)行應(yīng)用程序的基礎(chǔ)設(shè)施來(lái)達(dá)到該目的。在部署章節(jié)將有更進(jìn)一步的討論。
注意,RDD的checkpointing有存儲(chǔ)成本。這會(huì)導(dǎo)致批數(shù)據(jù)(包含的RDD被checkpoint)的處理時(shí)間增加。因此,需要小心的設(shè)置批處理的時(shí)間間隔。在最小的批容量(包含1秒的數(shù)據(jù))情況下,checkpoint每批數(shù)據(jù)會(huì)顯著的減少操作的吞吐量。相反,checkpointing太少會(huì)導(dǎo)致譜系以及任務(wù)大小增大,這會(huì)產(chǎn)生有害的影響。因?yàn)橛袪顟B(tài)的transformation需要RDD checkpoint。默認(rèn)的間隔時(shí)間是批間隔時(shí)間的倍數(shù),最少10秒。它可以通過(guò)dstream.checkpoint
來(lái)設(shè)置。典型的情況下,設(shè)置checkpoint間隔是DStream的滑動(dòng)間隔的5-10大小是一個(gè)好的嘗試。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號(hào)-3|閩公網(wǎng)安備35020302033924號(hào)
違法和不良信息舉報(bào)電話:173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號(hào)
聯(lián)系方式:
更多建議: