Spark GraphX頂點(diǎn)和邊RDDs

2018-11-26 16:36 更新

Spark GraphX頂點(diǎn)和邊RDDs

GraphX暴露保存在圖中的頂點(diǎn)和邊的RDD。然而,因?yàn)镚raphX包含的頂點(diǎn)和邊擁有優(yōu)化的數(shù)據(jù)結(jié)構(gòu),這些數(shù)據(jù)結(jié)構(gòu)提供了額外的功能。頂點(diǎn)和邊分別返回VertexRDDEdgeRDD。這一章我們將學(xué)習(xí)它們的一些有用的功能。

VertexRDDs

VertexRDD[A]繼承自RDD[(VertexID, A)]并且添加了額外的限制,那就是每個(gè)VertexID只能出現(xiàn)一次。此外,VertexRDD[A]代表了一組屬性類型為A的頂點(diǎn)。在內(nèi)部,這通過(guò)保存頂點(diǎn)屬性到一個(gè)可重復(fù)使用的hash-map數(shù)據(jù)結(jié)構(gòu)來(lái)獲得。所以,如果兩個(gè)VertexRDDs從相同的基本VertexRDD獲得(如通過(guò)filter或者mapValues),它們能夠在固定的時(shí)間內(nèi)連接而不需要hash評(píng)價(jià)。為了利用這個(gè)索引數(shù)據(jù)結(jié)構(gòu),VertexRDD暴露了一下附加的功能:

class VertexRDD[VD] extends RDD[(VertexID, VD)] {
  // Filter the vertex set but preserves the internal index
  def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
  // Transform the values without changing the ids (preserves the internal index)
  def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
  def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
  // Remove vertices from this set that appear in the other set
  def diff(other: VertexRDD[VD]): VertexRDD[VD]
  // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
  def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
  // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
  def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}

舉個(gè)例子,filter操作如何返回一個(gè)VertexRDD。過(guò)濾器實(shí)際使用一個(gè)BitSet實(shí)現(xiàn),因此它能夠重用索引以及保留和其它VertexRDDs做連接時(shí)速度快的能力。同樣的,mapValues操作不允許map函數(shù)改變VertexID,因此可以保證相同的HashMap數(shù)據(jù)結(jié)構(gòu)能夠重用。當(dāng)連接兩個(gè)從相同的hashmap獲取的VertexRDDs和使用線性掃描而不是昂貴的點(diǎn)查找實(shí)現(xiàn)連接操作時(shí),leftJoininnerJoin都能夠使用。

從一個(gè)RDD[(VertexID, A)]高效地構(gòu)建一個(gè)新的VertexRDDaggregateUsingIndex操作是有用的。概念上,如果我通過(guò)一組頂點(diǎn)構(gòu)造了一個(gè)VertexRDD[B],而VertexRDD[B]是一些RDD[(VertexID, A)]中頂點(diǎn)的超集,那么我們就可以在聚合以及隨后索引RDD[(VertexID, A)]中重用索引。例如:

val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

EdgeRDDs

EdgeRDD[ED]繼承自RDD[Edge[ED]],使用定義在PartitionStrategy的各種分區(qū)策略中的一個(gè)在塊分區(qū)中組織邊。在每個(gè)分區(qū)中,邊屬性和相鄰結(jié)構(gòu)被分別保存,當(dāng)屬性值改變時(shí),它們可以最大化的重用。

EdgeRDD暴露了三個(gè)額外的函數(shù)

// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

在大多數(shù)的應(yīng)用中,我們發(fā)現(xiàn),EdgeRDD操作可以通過(guò)圖操作者(graph operators)或者定義在基本RDD中的操作來(lái)完成。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)