Spark RDD 操作

2018-11-26 16:29 更新

Spark RDD 操作

RDDs 支持 2 種類型的操作:轉(zhuǎn)換(transformations) 從已經(jīng)存在的數(shù)據(jù)集中創(chuàng)建一個新的數(shù)據(jù)集;動作(actions) 在數(shù)據(jù)集上進行計算之后返回一個值到驅(qū)動程序。例如,map 是一個轉(zhuǎn)換操作,它將每一個數(shù)據(jù)集元素傳遞給一個函數(shù)并且返回一個新的 RDD。另一方面,reduce 是一個動作,它使用相同的函數(shù)來聚合 RDD 的所有元素,并且將最終的結(jié)果返回到驅(qū)動程序(不過也有一個并行 reduceByKey 能返回一個分布式數(shù)據(jù)集)。

在 Spark 中,所有的轉(zhuǎn)換(transformations)都是惰性(lazy)的,它們不會馬上計算它們的結(jié)果。相反的,它們僅僅記錄轉(zhuǎn)換操作是應(yīng)用到哪些基礎(chǔ)數(shù)據(jù)集(例如一個文件)上的。轉(zhuǎn)換僅僅在這個時候計算:當動作(action) 需要一個結(jié)果返回給驅(qū)動程序的時候。這個設(shè)計能夠讓 Spark 運行得更加高效。例如,我們可以實現(xiàn):通過 map 創(chuàng)建一個新數(shù)據(jù)集在 reduce 中使用,并且僅僅返回 reduce 的結(jié)果給 driver,而不是整個大的映射過的數(shù)據(jù)集。

默認情況下,每一個轉(zhuǎn)換過的 RDD 會在每次執(zhí)行動作(action)的時候重新計算一次。然而,你也可以使用 persist (或 cache)方法持久化(persist)一個 RDD 到內(nèi)存中。在這個情況下,Spark 會在集群上保存相關(guān)的元素,在你下次查詢的時候會變得更快。在這里也同樣支持持久化 RDD 到磁盤,或在多個節(jié)點間復(fù)制。

基礎(chǔ)

為了說明 RDD 基本知識,考慮下面的簡單程序:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行是定義來自于外部文件的 RDD。這個數(shù)據(jù)集并沒有加載到內(nèi)存或做其他的操作:lines 僅僅是一個指向文件的指針。第二行是定義 lineLengths,它是 map 轉(zhuǎn)換(transformation)的結(jié)果。同樣,lineLengths 由于懶惰模式也沒有立即計算。最后,我們執(zhí)行 reduce,它是一個動作(action)。在這個地方,Spark 把計算分成多個任務(wù)(task),并且讓它們運行在多個機器上。每臺機器都運行自己的 map 部分和本地 reduce 部分。然后僅僅將結(jié)果返回給驅(qū)動程序。

如果我們想要再次使用 lineLengths,我們可以添加:

lineLengths.persist()

reduce 之前,它會導(dǎo)致 lineLengths 在第一次計算完成之后保存到內(nèi)存中。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號