W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗值獎勵
一般情況下,當(dāng)一個傳遞給Spark操作(例如map和reduce)的函數(shù)在遠(yuǎn)程節(jié)點上面運行時,Spark操作實際上操作的是這個函數(shù)所用變量的一個獨立副本。這些變量被復(fù)制到每臺機(jī)器上,并且這些變量在遠(yuǎn)程機(jī)器上的所有更新都不會傳遞回驅(qū)動程序。通??缛蝿?wù)的讀寫變量是低效的,但是,Spark還是為兩種常見的使用模式提供了兩種有限的共享變量:廣播變量(broadcast variable)和累加器(accumulator)
廣播變量允許程序員緩存一個只讀的變量在每臺機(jī)器上面,而不是每個任務(wù)保存一份拷貝。例如,利用廣播變量,我們能夠以一種更有效率的方式將一個大數(shù)據(jù)量輸入集合的副本分配給每個節(jié)點。(Broadcast variables allow theprogrammer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.They can be used, for example,to give every node a copy of a large input dataset in an efficient manner.)Spark也嘗試著利用有效的廣播算法去分配廣播變量,以減少通信的成本。
一個廣播變量可以通過調(diào)用SparkContext.broadcast(v)
方法從一個初始變量v中創(chuàng)建。廣播變量是v的一個包裝變量,它的值可以通過value
方法訪問,下面的代碼說明了這個過程:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
廣播變量創(chuàng)建以后,我們就能夠在集群的任何函數(shù)中使用它來代替變量v,這樣我們就不需要再次傳遞變量v到每個節(jié)點上。另外,為了保證所有的節(jié)點得到廣播變量具有相同的值,對象v不能在廣播之后被修改。
顧名思義,累加器是一種只能通過關(guān)聯(lián)操作進(jìn)行“加”操作的變量,因此它能夠高效的應(yīng)用于并行操作中。它們能夠用來實現(xiàn)counters
和sums
。Spark原生支持?jǐn)?shù)值類型的累加器,開發(fā)者可以自己添加支持的類型。如果創(chuàng)建了一個具名的累加器,它可以在spark的UI中顯示。這對于理解運行階段(running stages)的過程有很重要的作用。(注意:這在python中還不被支持)
一個累加器可以通過調(diào)用SparkContext.accumulator(v)
方法從一個初始變量v中創(chuàng)建。運行在集群上的任務(wù)可以通過add
方法或者使用+=
操作來給它加值。然而,它們無法讀取這個值。只有驅(qū)動程序可以使用value
方法來讀取累加器的值。如下的代碼,展示了如何利用累加器將一個數(shù)組里面的所有元素相加:
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
這個例子利用了內(nèi)置的整數(shù)類型累加器。開發(fā)者可以利用子類AccumulatorParam創(chuàng)建自己的累加器類型。AccumulatorParam接口有兩個方法:zero
方法為你的數(shù)據(jù)類型提供一個“0 值”(zero value);addInPlace
方法計算兩個值的和。例如,假設(shè)我們有一個Vector
類代表數(shù)學(xué)上的向量,我們能夠如下定義累加器:
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
在scala中,Spark支持用更一般的Accumulable接口來累積數(shù)據(jù)-結(jié)果類型和用于累加的元素類型不一樣(例如通過收集的元素建立一個列表)。Spark也支持用SparkContext.accumulableCollection
方法累加一般的scala集合類型。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號-3|閩公網(wǎng)安備35020302033924號
違法和不良信息舉報電話:173-0602-2364|舉報郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號
聯(lián)系方式:
更多建議: