分佈式圖計算是個啥?一文帶你理解並實戰Spark GraphX

Spark是一種大規模、快速計算的集群平臺,本頭條號試圖通過學習Spark官網的實戰演練筆記提升筆者實操能力以及展現Spark的精彩之處。有關框架介紹和環境配置可以參考以下內容:

一、屬性圖

GraphX是Spark中用於圖形圖形並行計算的新組件。在較高的層次上,GraphX 通過引入新的Graph抽象來擴展Spark RDD:一個有向多重圖,其屬性附加到每個頂點和邊上。為了支持圖計算,GraphX公開了一組基本的操作符(例如,子圖joinVerticesaggregateMessages),以及優化的Pregel API。此外,GraphX包括越來越多的圖形算法和構建器集合,以簡化圖形分析任務。

1.圖計算入門

首先,首先需要將Spark和GraphX導入項目,如下所示:

<code>import org.apache.spark._
import org.apache.spark.graphx._
// 圖計算也需要RDD
import org.apache.spark.rdd.RDD/<code>

屬性圖是一個定向多重圖形,用戶定義的對象附加到每個頂點和邊緣。定向多圖是具有共享相同源和目標頂點的潛在多個平行邊緣的有向圖。支持平行邊緣的能力簡化了在相同頂點之間可以有多個關係(例如:同事和朋友)的建模場景。每個頂點都由唯一的 64 位長標識符(VertexId)鍵入。GraphX 不對頂點標識符施加任何排序約束。類似地,邊緣具有對應的源和目標頂點標識符。

屬性圖是通過 vertex(VD) edge(ED)類型進行參數化的。這些是分別與每個頂點和邊緣相關聯的對象的類型。

在某些情況下,可能希望在同一個圖形中具有不同屬性類型的頂點。這可以通過繼承來實現。例如,將用戶和產品建模為二分圖,我們可能會執行以下操作:

<code>class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty

case class ProductProperty(val name: String, val price: Double) extends VertexProperty
var graph: Graph[VertexProperty, String] = null/<code>

像 RDD 一樣,屬性圖是不可變的,分佈式的和容錯的。通過生成具有所需更改的新圖形來完成對圖表的值或結構的更改。請注意,原始圖形的大部分(即,未受影響的結構,屬性和索引)在新圖表中重複使用,可降低此內在功能數據結構的成本。使用一系列頂點分割啟發式方法,在執行器之間劃分圖形。與 RDD 一樣,在發生故障的情況下,可以在不同的機器上重新創建圖形的每個分區。

邏輯上,屬性圖對應於一對編碼每個頂點和邊緣的屬性的類型集合(RDD)。因此,圖類包含訪問圖形頂點和邊的成員:

<code>class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}/<code>

VertexRDD[VD]EdgeRDD[ED]分別擴展了RDD[(VertexId, VD)] RDD[Edge[ED]] 的優化版本。

VertexRDD[VD] EdgeRDD[ED] 都提供了圍繞圖計算和利用內部優化的附加功能。

2.屬性圖示例

假設我們要構建一個由 GraphX 項目中的各種協作者組成的屬性圖。頂點屬性可能包含用戶名和職業。我們可以用描述協作者之間關係的字符串來註釋邊:

分佈式圖計算是個啥?一文帶你理解並實戰Spark GraphX

從原始文件合成生成器構建屬性圖有許多方法,這些在圖形構建器的一節中有更詳細的討論。最普遍的方法是使用Graph對象。例如,以下代碼從RDD集合中構建一個圖:

<code>// 定義頂點
val Buildings: RDD[(VertexId, (String, String))] =sc.parallelize(Array((3L, ("中南樓", "行政樓")), (7L, ("文泰樓", "教學樓")),(5L, ("中原樓", "行政樓")), (2L, ("文波樓", "行政樓"))))
// 定義邊
scala> val relationships: RDD[Edge[String]] =sc.parallelize(Array(Edge(3L, 7L, "不同校區不同性質"), Edge(5L, 3L, "不同校區同性質"), Edge(2L, 5L, "同校區不同性質"), Edge(5L, 7L, "同校區不同性質")))
// 定義默認樓棟
val defaultBuilding = ("文添樓", "教學樓")
// 初始化圖
val graph = Graph(Buildings, relationships, defaultBuilding)/<code>

我們可以分別使用 graph.vertices graph.edges 成員將圖形解構成相應的頂點和邊緣視圖。

<code>graph.vertices.filter { case (id, (name, pos)) => pos == "行政樓" }.count
graph.edges.filter(e => e.srcId > e.dstId).count/<code>

除了屬性圖的頂點和邊緣視圖之外,GraphX 還暴露了三元組視圖。三元組在視圖邏輯上

連接頂點邊緣屬性,生成 RDD[EdgeTriplet[VD, ED]] 包含 EdgeTriplet 該類的實例。EdgeTriplet 類通過分別添加包含源和目標屬性的 srcAttrdstAttr 成員來擴展 Edge 類。我們可以使用圖形的三元組視圖來渲染描述用戶之間關係的字符串集合。

<code>val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " 與 " + triplet.dstAttr._1+ " 是 " + triplet.attr + "的樓")
facts.collect.foreach(println(_))/<code>

二、圖運算符

1.運算符列表

以下是兩個定義的功能的簡要摘要,但為簡單起見將GraphGraphOps 作為 Graph 的成員呈現。

<code>// 屬性圖中的函數總結
class Graph[VD, ED] {
// 圖的基本信息變量

val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
​ val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// 圖的集合
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// 有關緩存的函數
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
// 改變分區方式
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// 轉換頂點和邊屬性
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
// 修改圖結構
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// 連接RDD和圖
def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// 彙總有關相鄰三元組的信息
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All
)
: VertexRDD[A]
// 迭代圖遍歷計算

def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
mergeMsg: (A, A) => A
)
: Graph[VD, ED]
// 基本的圖算法
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}   /<code>

2.屬性運算符

RDD map 運算符一樣,屬性圖包含以下內容:

<code>class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}/<code>

這些運算符中的每一個都會產生一個新圖形,該圖形的頂點或邊屬性由用戶定義的map函數修改。

3.結構運算符

目前GraphX只支持一套簡單的常用結構運算符,我們預計將來會增加更多。以下是基本結構運算符的列表。

<code>class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]

def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}/<code>
  • reverse 運算符將返回逆轉的所有邊緣方向上的新圖。這在例如嘗試計算逆 PageRank 時是有用的。由於反向操作不會修改頂點或邊緣屬性或更改邊緣數量,因此可以在沒有數據移動或重複的情況下高效地實現。
  • subgraph 操作者需要的頂點和邊緣的謂詞,並返回包含只有滿足謂詞頂點的頂點的曲線圖(評估為真),並且滿足謂詞邊緣邊緣 _並連接滿足頂點謂詞頂點_。所述 subgraph 操作員可在情況編號被用來限制圖形以頂點和感興趣的邊緣或消除斷開的鏈接。
  • mask 操作者通過返回包含該頂點和邊,它們也在輸入圖形中發現的曲線構造一個子圖。這可以與 subgraph 運算符一起使用,以便根據另一個相關圖中的屬性限制圖形。例如,我們可以使用缺少頂點的圖運行連接的組件,然後將答案限制為有效的子圖。
  • groupEdges
    操作符將多邊形中的平行邊(即,頂點對之間的重複邊)合併。在許多數值應用中,可以將平行邊緣(它們的權重組合)合併成單個邊緣,從而減小圖形的大小。

4.Join運算符

在許多情況下,有必要使用圖形連接來自外部收集(RDD)的數據。例如,我們可能有額外的用戶屬性,我們要與現有的圖形合併,或者我們可能希望將頂點屬性從一個圖形拉到另一個。這些任務可以使用 join 運算符完成。下面我們列出關鍵 join 運算符:

<code>class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}/<code>
  • joinVertices 操作符將頂點與輸入 RDD 相連,並返回一個新的圖形,其中通過將用戶定義的 map 函數應用於已連接頂點的結果而獲得的頂點屬性。RDD 中沒有匹配值的頂點保留其原始值。
  • 除了將用戶定義的 map 函數應用於所有頂點並且可以更改頂點屬性類型之外,更一般的
    outerJoinVertices 的行為類似於 joinVertices。因為不是所有的頂點都可能在輸入 RDD 中具有匹配的值,所以 map 函數採用 Option 類型。

5.鄰域聚合

許多圖形分析任務的關鍵步驟是聚合關於每個頂點鄰域的信息。例如,我們可能想知道每個用戶擁有的關注者數量或每個用戶的追隨者的平均年齡。許多迭代圖表算法(例如:網頁級別,最短路徑,以及連接成分)相鄰頂點(例如:電流值的 PageRank,最短到源路徑,和最小可達頂點 ID)的重複聚合性質。

GraphX 中的核心聚合操作是 aggregateMessages(聚合消息)。該運算符將用戶定義的 sendMsg 函數應用於圖中的每個 邊緣三元組,然後使用該 mergeMsg 函數在其目標頂點聚合這些消息。

<code>class Graph[VD, ED] {
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]
}/<code>

用戶定義的

sendMsg 函數接受一個 EdgeContext,它將源和目標屬性以及 edge 屬性和函數 (sendToSrcsendToDst) 一起發送到源和目標屬性。在 map-reduce 中,將 sendMsg 作為 map 函數。用戶定義的 mergeMsg 函數需要兩個發往同一頂點的消息,併產生一條消息。想想 mergeMsg 是 map-reduce 中的 reduce 函數。aggregateMessages 運算符返回一個 VertexRDD[Msg],其中包含去往每個頂點的聚合消息(Msg類型)。沒有收到消息的頂點不包括在返回的 VertexRDDVertexRDD 中。

三、Pregel API

圖形是固有的遞歸數據結構,因為頂點的屬性取決於其鄰居的屬性,而鄰居的屬性又依賴於 其 鄰居的屬性。因此,許多重要的圖算法迭代地重新計算每個頂點的屬性,直到達到一個固定點條件。已經提出了一系列圖並行抽象來表達這些迭代算法。GraphX 公開了 Pregel API 的變體。在以下示例中,我們可以使用 Pregel 運算符來表達單源最短路徑的計算:

<code>import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
// 帶有包含距離屬性的邊的圖
val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // 最終來源
// 初始化圖
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // 頂點程序
triplet => { // 發送消息
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
} },(a, b) => math.min(a, b) // 合併消息)
println(sssp.vertices.collect.mkString("\\n"))/<code>

四、圖算法

GraphX 包括一組簡化分析任務的圖算法。該算法被包含在 org.apache.spark.graphx.lib 包可直接作為方法通過 GraphOps來訪問 Graph。本節介紹算法及其使用方法。

1.PageRank

PageRank 測量在圖中每個頂點的重要性,假設從邊緣 u 到 v 表示的認可 v 通過的重要性 u。例如,如果 Twitter 用戶遵循許多其他用戶,則用戶將被高度排名。

GraphX 附帶了 PageRank 的靜態動態

實現方法作PageRank 對象上的方法。靜態 PageRank 運行固定次數的迭代,而動態 PageRank 運行直到排列收斂(即,停止改變超過指定的公差)。GraphOps 允許直接調用這些算法作為方法 Graph。

GraphX還包括一個可以運行 PageRank 的社交網絡數據集示例。給出了一組用戶data/graphx/users.txt,並給出了一組用戶之間的關係 data/graphx/followers.txt。我們計算每個用戶的 PageRank 如下:

<code>import org.apache.spark.graphx.GraphLoader

val Graph = GraphLoader.edgeListFile(sc,"file:///usr/local/spark/data/graphx/followers.txt")
val ranks = graph.pageRank(0.0001).vertices
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val users = sc.textFile("file:///usr/local/spark/data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
println(ranksByUsername.collect().mkString("\\n"))/<code>

2.連接組件

連接的組件算法將圖中每個連接的組件與其最低編號頂點的ID進行標記。例如,在社交網絡中,連接的組件可以近似群集。GraphX包含ConnectedComponents object 中算法的實現,我們從

PageRank 部分計算示例社交網絡數據集的連接組件如下:

<code>import org.apache.spark.graphx.GraphLoader
// 加載PageRank官方示例文件
val Graph = GraphLoader.edgeListFile(sc,"file:///usr/local/spark/data/graphx/followers.txt")
// 查找連接組件
val cc = graph.connectedComponents().vertices
// 與帶有用戶名的連接組件Jion
sc.textFile("file:///usr/local/spark/data/graphx/users.txt").collect()
val users = sc.textFile("file:///usr/local/spark/data/graphx/users.txt").map {line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
// 輸出結果
println(ccByUsername.collect().mkString("\\n"))/<code>

3.Triangle計數

頂點是三角形的一部分,當它有兩個相鄰的頂點之間有一個邊。GraphX 在 TriangleCount 對象中實現一個三角計數算法,用於確定通過每個頂點的三角形數量,提供聚類度量。我們從PageRank 部分計算社交網絡數據集的三角形數。需要注意的是 TriangleCount 邊緣要處於規範方向(srcId & dstId),而圖形要使用 Graph.partitionBy

<code>import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
// 以規範順序加載邊線並劃分圖形以進行三角形計數
val graph = GraphLoader.edgeListFile(sc, "file:///usr/local/spark/data/graphx/followers.txt", true)
val graph = GraphLoader.edgeListFile(sc, "file:///usr/local/spark/data/graphx/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
// 找到每個頂點的三角形數
val triCounts = graph.triangleCount().vertices
// 使用用戶名Join三角形數
val users = sc.textFile("file:///usr/local/spark/data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) => (username, tc)}

scala> println(triCountByUsername.collect().mkString("\\n"))

(justinbieber,0)
(matei_zaharia,1)
(ladygaga,0)
(BarackObama,0)
(jeresig,1)
(odersky,1)/<code>

至此Spark GraphX圖計算講解完成,Spark大數據分佈式處理實戰筆記也到此結束。之後可能在此基礎上繼續延伸,更新Spark MLlib源碼解析系列。此係列將機器學習算法與分佈式架構結合在一起,使得AI人工智能算法在大數據場景下發揮更大的作用。前文筆記請參考下面的鏈接:


分享到:


相關文章: