Spark Graphx的API操作

和RDD一樣,Graph也有一系列的基礎運算,還有GraphOps類。

所有的操作都可以在Graph類中找到。

Spark Graphx的API操作

1.屬性操作:

class Graph[VD, ED] {

def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]

def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]

def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

}

屬性操作:用於修改點邊的屬性。

每個操作都產生一個新的圖,這個新的圖包含通過用戶自定義的map操作修改後的頂點或邊的屬性。

使用mapReduceTriplets、mapEdges、mapVertices操作修改屬性

使用mapVertices修改頂點的屬性使用mapEdges將edge的屬性使用mapTriplets對三元組整體進行操作,即可以利用srcAttr attr dstAttr來修改attr的信息

import java.text.SimpleDateFormat

import org.apache.log4j.{Level, Logger}

import org.apache.spark.graphx.{VertexRDD, EdgeTriplet, Graph, Edge}

import org.apache.spark.{SparkContext, SparkConf}

/**

* Created by lichangyue on 2016/9/18.

*/

object TestPropsOps {

def main(args: Array[String]) {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

val conf = new SparkConf()

val sc = new SparkContext("local","test",conf)

// 1,Taro,100

// 2,Jiro,200

// 3,Sabo,300

val vertexLines = sc.textFile("hdfs://S7SA053:8020/stat/subgraph-vertices.csv")

val vertices = vertexLines.map(line =>{

val cols = line.split(",")

(cols(0).trim.toLong,(cols(1).trim,cols(2).trim.toLong))

})

val format = new SimpleDateFormat("yyyy/MM/dd")

// 1,2,100,2014/12/1

// 2,3,200,2014/12/2

// 3,1,300,2014/12/3

val edgesLines = sc.textFile("hdfs://S7SA053:8020/stat/subgraph-edges.csv")

val edges = edgesLines.map(line =>{

val cols = line.split(",")

Edge(cols(0).toLong,cols(1).toLong,(cols(2).toLong,format.parse(cols(3).trim)))

})

//生成圖

val graph = Graph(vertices,edges)

println("\n\nConfirm edges internal of graph")

graph.edges.foreach(println(_))

// Edge(1,2,(100,Mon Dec 01 00:00:00 EST 2014))

// Edge(2,3,(200,Tue Dec 02 00:00:00 EST 2014))

// Edge(3,1,(300,Wed Dec 03 00:00:00 EST 2014))

println("\nconfirm vertices internal of graph")

graph.vertices.collect.foreach(println(_))

// (2,(Jiro,200))

// (1,(Taro,100))

// (3,(Sabo,300))

// 使用mapVertices修改頂點的屬性,由原先的(String, Long)修改為(String的length*Long的值)

val graph2 = graph.mapVertices((vid ,attr) => attr._1.length * attr._2)

println ("\n\nconfirm vertices internal of graph2")

graph2.vertices.collect.foreach(println(_))

// (2,800) Jiro的長度為4,乘以200得到800,下同

// (1,400)

// (3,1200)

// 使用mapEdges將edge的屬性由(100,Mon Dec 01 00:00:00 EST 2014)變為100

val graph3 = graph.mapEdges(edge => edge.attr._1)

println("\n\nconfirm edges internal of graph3")

graph3.edges.collect.foreach(println(_))

// Edge(1,2,100)

// Edge(2,3,200)

// Edge(3,1,300)

println("\n\nconfirm triplets internal of graph")

graph.triplets.collect.foreach(println(_))

// ((1,(Taro,100)),(2,(Jiro,200)),(100,Mon Dec 01 00:00:00 EST 2014))

// ((2,(Jiro,200)),(3,(Sabo,300)),(200,Tue Dec 02 00:00:00 EST 2014))

// ((3,(Sabo,300)),(1,(Taro,100)),(300,Wed Dec 03 00:00:00 EST 2014))

// 到這裡可以觀察到,上述操作對graph本身並沒有影響

// 使用mapTriplets對三元組整體進行操作,即可以利用srcAttr attr dstAttr來修改attr的信息

val graph4 = graph.mapTriplets(edge => edge.srcAttr._2 + edge.attr._1 + edge.dstAttr._2)

println("\n\nconfirm veteces internal ")

graph4.edges.collect.foreach(println(_))

// Edge(1,2,400) //400 = 100+200+100

// Edge(2,3,700)

// Edge(3,1,700)

// 使用mapReduceTriplets來生成新的VertexRDD

// 利用map對每一個三元組進行操作

// 利用reduce對相同Id的頂點屬性進行操作

val newVertices:VertexRDD[Long] = graph.mapReduceTriplets(

mapFunc = (edge:EdgeTriplet[(String, Long), (Long, java.util.Date)]) => {

val toSrc = Iterator((edge.srcId, edge.srcAttr._2 - edge.attr._1))

val toDst = Iterator((edge.dstId, edge.dstAttr._2 + edge.attr._1))

toSrc ++ toDst

},

reduceFunc = (a1:Long, a2:Long) => ( a1 + a2 )

)

println("\n\nconfirm vertices internal of newVertices")

newVertices.collect().foreach(println(_))

// (2,300)

// (1,400)

// (3,500)

}

}

2.結構性操作

class Graph[VD, ED] {

def reverse: Graph[VD, ED]

def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,

vpred: (VertexId, VD) => Boolean): Graph[VD, ED]

def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]

def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]

}

結構操作:用於修改圖結構

reverse操作返回一個新的圖,這個圖的邊的方向都是反轉的。例如,這個操作可以用來計算反轉的PageRank。

subgraph(epred: (EdgeTriplet[VD, ED]) ⇒ Boolean = x => true, vpred: (VertexId, VD) ⇒ Boolean = (v, d) => true): Graph[VD, ED]

epred: (EdgeTriplet[VD, ED]) :epred對應是的EdgeTriplet對象,使用對象中的屬性進行過濾。

vpred: (VertexId, VD):vpred :對應的是(VertexId, VD),

操作 利用頂點和邊的判斷式(predicates),返回的圖僅僅包含滿足頂點判斷式的頂點、滿足邊判斷式的邊以及滿足頂點判斷式的連接頂點(connect vertices)。

subgraph操作可以用於很多場景,如獲取 感興趣的頂點和邊組成的圖或者獲取清除斷開鏈接後的圖。

mask:返回的是current graph和other graph的公共子圖

connectedComponents源碼:返回連接成分的頂點值:包含頂點Id。

先對圖進行connectedComponents,轉換成新的圖ccGraph,然後再對原圖Graph進行subgraph操作,最後再mask取交集

groupEdges⇒ED):Graph[VD,ED])操作合併多重圖 中的並行邊(如頂點對之間重複的邊)。在大量的應用程序中,並行的邊可以合併(它們的權重合並)為一條邊從而降低圖的大小。

groupEdges將相同邊進行合併,e1和e2是需要合併的兩條邊的屬性。

例1.下面的例子刪除了斷開的鏈接:

過濾掉了頂點第二個屬性為Missing的點及其相關邊

package com.graph.test

import org.apache.log4j.{Level, Logger}

import org.apache.spark.graphx.{Graph, Edge}

import org.apache.spark.{SparkContext, SparkConf}

/**

* Created by lichangyue on 2016/9/13.

*/

object FirstGraph1 {

def main(args: Array[String]) {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

val conf = new SparkConf().setAppName("FirstGrahp").setMaster("local")

val sc = new SparkContext(conf)

val users = sc.parallelize(

Array((3L,("rxin","student")),(7L,("jgonzal","postdoc"

)),

(5L,("franklin","prof")),(2L,("istoica","prof")),

(4L,("peter","student"))))

val relationships = sc.parallelize(

Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),

Edge(2L,5L,"collegaue"),Edge(5L,7L,"pi"),

Edge(4L,0L,"student"),Edge(5L,0L,"colleague")))

//

val defaultUser = ("John Doe","Missing")

//創建初始圖

val graph =Graph(users,relationships,defaultUser)

//0號用戶被替換成("John Doe","Missing")

graph.triplets.map(

triplet => triplet.srcAttr._1 +" is the " + triplet.attr +" of " + triplet.dstAttr

).collect.foreach(println(_))

//移除缺失的頂點和他們連接的邊,vpred 是subgraph方法的參數名稱

val validGrap = graph.subgraph(vpred = (id,attr) => attr._2 !="Missing")

validGrap.vertices.collect.foreach(println(_))

validGrap.triplets.map(

triplet=> triplet.srcAttr._1 +" is the "+ triplet.attr + " of " + triplet.dstAttr._1

).collect.foreach(println(_))

}

}

返回結果:

istoica is the collegaue of (franklin,prof)

rxin is the collab of (jgonzal,postdoc)

peter is the student of (John Doe,Missing)

franklin is the colleague of (John Doe,Missing)

franklin is the advisor of (rxin,student)

franklin is the pi of (jgonzal,postdoc)

(4,(peter,student))

(3,(rxin,student))

(7,(jgonzal,postdoc))

(5,(franklin,prof))

(2,(istoica,prof))

istoica is the collegaue of franklin

rxin is the collab of jgonzal

franklin is the advisor of rxin

franklin is the pi of jgonzal

例2:mask demo:

import org.apache.log4j.{Level, Logger}

import org.apache.spark.graphx.{Graph, Edge}

import org.apache.spark.{SparkContext, SparkConf}

/**

* Created by lichangyue on 2016/9/14.

*/

object TestMask {

def main(args: Array[String]) {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

val conf = new SparkConf().setMaster("local").setAppName("mask")

val sc = new SparkContext(conf)

val users = sc.parallelize(

Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),

(5L, ("franklin", "prof")), (2L, (

"istoica", "prof")),

(4L, ("peter", "student"))))

// Create an RDD for edges

val relationships = sc.parallelize(

Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),

Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),

Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))

// Define a default user in case there are relationship with missing user

val defaultUser = ("John Doe", "Missing")

val graph = Graph(users,relationships,defaultUser)

//connectedComponents源碼:返回連接成分的頂點值:包含頂點Id,頂點的屬性沒了

val ccGraph = graph.connectedComponents()

ccGraph.triplets.map(

triplet => " srcid:"+triplet.srcId +", dstID:" + triplet.dstId +",srcAttr:" + triplet.srcAttr + " ,attr: " + triplet.attr +" ,dstAttr: " + triplet.dstAttr

).collect().foreach(println(_))

val validGraph = graph.subgraph(vpred = (id,attr) => attr._2 !="Missing")

println("validGraph:")

validGraph.triplets.map(

triplet => " srcid:"+triplet.srcId +", dstID:" + triplet.dstId +",srcAttr:" + triplet.srcAttr + " ,attr: " + triplet.attr +" ,dstAttr: " + triplet.dstAttr

).collect().foreach(println(_))

val

validccGraph = ccGraph.mask(validGraph)

println("validCCGraph:")

validccGraph.triplets.map(

triplet => " srcid:"+triplet.srcId +", dstID:" + triplet.dstId +",srcAttr:" + triplet.srcAttr + " ,attr: " + triplet.attr +" ,dstAttr: " + triplet.dstAttr

).collect().foreach(println(_))

}

}

輸出結果:

graph:

srcid:2, dstID:5,srcAttr:(istoica,prof) ,attr: colleague ,dstAttr: (franklin,prof)

srcid:3, dstID:7,srcAttr:(rxin,student) ,attr: collab ,dstAttr: (jgonzal,postdoc)

srcid:4, dstID:0,srcAttr:(peter,student) ,attr: student ,dstAttr: (John Doe,Missing)

srcid:5, dstID:0,srcAttr:(franklin,prof) ,attr: colleague ,dstAttr: (John Doe,Missing)

srcid:5, dstID:3,srcAttr:(franklin,prof) ,attr: advisor ,dstAttr: (rxin,student)

srcid:5, dstID:7,srcAttr:(franklin,prof) ,attr: pi ,dstAttr: (jgonzal,postdoc)

ccGraph:

srcid:2, dstID:5,srcAttr:0 ,attr: colleague ,dstAttr: 0

srcid:3, dstID:7,srcAttr:0 ,attr: collab ,dstAttr: 0

srcid:4, dstID:0,srcAttr:0 ,attr: student ,dstAttr: 0

srcid:5, dstID:0,srcAttr:0 ,attr: colleague ,dstAttr: 0

srcid:5, dstID:3,srcAttr:0 ,attr: advisor ,dstAttr: 0

srcid:5, dstID:7,srcAttr:0 ,attr: pi ,dstAttr: 0

validGraph:

srcid:2, dstID:5,srcAttr:(istoica,prof) ,attr: colleague ,dstAttr: (franklin,prof)

srcid:3, dstID:7,srcAttr:(rxin,student) ,attr: collab ,dstAttr: (jgonzal,postdoc)

srcid:5, dstID:3,srcAttr:(franklin,prof) ,attr: advisor ,dstAttr: (rxin,student)

srcid:5, dstID:7,srcAttr:(franklin,prof) ,attr: pi ,dstAttr: (jgonzal,postdoc)

validCCGraph:

srcid:2, dstID:5,srcAttr:0 ,attr: colleague ,dstAttr: 0

srcid:3, dstID:7,srcAttr:0 ,attr: collab ,dstAttr: 0

srcid:5, dstID:3,srcAttr:0 ,attr: advisor ,dstAttr: 0

srcid:5, dstID:7,srcAttr:0 ,attr: pi ,dstAttr: 0

 groupEdges⇒ED):Graph[VD,ED])操作合併多重圖 中的並行邊(如頂點對之間重複的邊)。在大量的應用程序中,並行的邊可以合併(它們的權重合並)為一條邊從而降低圖的大小。

groupEdges將相同邊進行合併,e1和e2是需要合併的兩條邊的屬性。

import java.text.SimpleDateFormat

import org.apache.log4j.{Level, Logger}

import org.apache.spark.graphx.{Graph, Edge, VertexId}

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkContext, SparkConf}

/**

* Created by lichangyue on 2016/9/14.

*/

object StructOpter {

def main(args: Array[String]) {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

val conf = new SparkConf()

val sc =

new SparkContext("local","test",conf)

// day09-vertices.csv

// 1,Taro,100

// 2,Jiro,200

// 3,Sabo,300

val vertexLines :RDD[String] = sc.textFile("hdfs://S7SA053:8020/stat/subgraph-vertices.csv")

val v: RDD[(VertexId,(String,Long))] = vertexLines.map(line => {

val cols = line.split(",")

(cols(0).toLong,(cols(1),cols(2).trim.toLong))

})

val format = new SimpleDateFormat("yyyy/MM/dd")

// day09-01-edges.csv

// 1,2,100,2014/12/1

// 2,3,200,2014/12/2

// 3,1,300,2014/12/3

val edgeLines = sc.textFile("hdfs://S7SA053:8020/stat/subgraph-edges.csv")

val e = edgeLines.map(line=>{

val cols =line.split(",")

Edge(cols(0).toLong,cols(1).toLong,(cols(2).trim.toLong,format.parse(cols(3).trim)))

})

//創建圖

val graph = Graph(v,e)

println("\n\nconfirm Vertices Internal of graph")

graph.vertices.collect.foreach(println(_))

// (1,(Taro,100))

// (3,(Sabo,300))

// (2,(Jiro,200))

println("\n\nconfirm edge internal of graph")

graph.edges.collect.foreach(println(_))

// Edge(1,2,(100,Mon Dec 01 00:00:00 CST 2014))

// Edge(2,3,(200,Tue Dec 02 00:00:00 CST 2014))

// Edge(3,1,(300,Wed Dec 03 00:00:00 CST 2014))

//1.reverse 操作:邊的方向改變了

println("\n\nconfirm edges reversed graph")

graph.reverse.edges.collect.foreach(println)

// Edge(1,3,(300,Wed Dec 03 00:00:00 CST 2014))

// Edge(2,1,(100,Mon Dec 01 00:00:00 CST 2014))

// Edge(3,2,(200,Tue Dec 02 00:00:00 CST 2014))

/**2.subgraph*/

println("\n\nconfirm subgraphed vertices graph ")

//根據頂點條件建立子圖

graph.subgraph(vpred =(vid,v) => v._2 >= 200).vertices.collect.foreach(println(_))

// (3,(Sabo,300))

// (2,(Jiro,200))

println("\n\n confrim subgraph edges graph")

//根據邊條件建立子圖

graph.subgraph(epred =edge => edge.attr._1 >=200).edges.collect.foreach(println(_))

//頂點和邊同時加限制

val subgraph = graph.subgraph(

vpred=(vid,v) => v._2 >=200 ,

epred =edge => edge.attr._1 >= 200)

println("\n\n頂點和邊限制")

subgraph.edges.collect.foreach(println(_))

// Edge(2,3,(200,Tue Dec 02 00:00:00 CST 2014))

//3.mask

val maskedgraph = graph.mask(subgraph)

println("\nmask 操作")

//返回一個子圖,兩個圖的交集

maskedgraph.vertices.collect.foreach(println(_))

// (3,(Sabo,300))

// (2,(Jiro,200))

maskedgraph.edges.collect.foreach(println(_))

// Edge(2,3,(200,Tue Dec 02 00:00:00 CST 2014))

//4.groupEdge

// day09-02-edges.csv

// 1,2,100,2014/12/1

// 1,2,110,2014/12/11

// 2,3,200,2014/12/21

// 2,3,210,2014/12/2

// 3,1,300,2014/12/3

// 3,1,310,2014/12/31

val

edgeLines2 = sc.textFile("hdfs://S7SA053:8020/stat/edgegroup.csv")

val e2 = edgeLines2.map(line =>{

val cols = line.split(",")

Edge(cols(0).toLong,cols(1).toLong,(cols(2).trim.toLong,format.parse(cols(3))))

})

//構建第二個圖

val graph2 = Graph(v,e2)

//使用groupEdges將相同邊進行合併,e1和e2是需要合併的兩條邊的屬性

val edgeGroupedGraph = graph2.groupEdges(

merge =(e1,e2) => (e1._1 + e2._1,if(e1._2.getTime < e2._2.getTime) e1._2 else e2._2) )

println("\n\nconfirm merged edges graph")

edgeGroupedGraph.edges.collect.foreach(println)

// Edge(1,2,(210,Mon Dec 01 00:00:00 CST 2014))

// Edge(2,3,(410,Tue Dec 02 00:00:00 CST 2014))

// Edge(3,1,(610,Wed Dec 03 00:00:00 CST 2014))

sc.stop()

}

}

3.連接操作

(使用join操作修改屬性)

在許多情況下,有必要將外部數據加入到圖中。

例如,我們可能有額外的用戶屬性需要合併到已有的圖中或者我們可能想從一個圖中取出頂點特徵加入到另外一個圖中。這些任務可以用join操作完成。

join操作:

class Graph[VD, ED] {

def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)

: Graph[VD, ED]

def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)

: Graph[VD2, ED]

}

使用joinVertices操作,根據id進行連接,用user中的屬性替換圖中對應Id的屬性 ,默認不在users中的頂點的屬性不變。

使用outerJoinVertices將user中的屬性賦給graph中的頂點,如果圖中頂點不在user的RDD中,則賦值為None

注意,對於給定的頂點,如果RDD中有超過1個的匹配值,則僅僅使用其中的一個。建議用下面的方法保證輸入RDD的唯一性。下面的方法也會預索引返回的值用以加快後續的join操作。

val nonUniqueCosts: RDD[(VertexID, Double)]

val uniqueCosts: VertexRDD[Double] =

graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)

val joinedGraph = graph.joinVertices(uniqueCosts)(

(id, oldCost, extraCost) => oldCost + extraCost)

例子說明:利用joinVertices和outJoinVertices對graph的頂點屬性進行修改

import org.apache.log4j.{Level, Logger}

import org.apache.spark.graphx.GraphLoader

import org.apache.spark.{SparkContext, SparkConf}

/**

* 例子說明:利用joinVertices和outJoinVertices對graph的頂點屬性進行修改

* Created by lichangyue on 2016/9/18.

*/

object TestJoin {

def main(args: Array[String]) {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

val conf = new SparkConf()

val sc = new SparkContext ("local","test",conf)

// 利用edge信息生成圖

// dataset info

// 1 2

// 2 3

// 3 1

val graph = GraphLoader.edgeListFile(sc, "hdfs://S7SA053:8020/stat/join-edges.tsv").cache()

// 以[vid, name]形式讀取vertex信息

// day03-vertices.csv

// 1,Taro

// 2,Jiro

val vertexLines = sc.textFile("hdfs://S7SA053:8020/stat/join-vertices.tsv")

val users = vertexLines.map(line =>{

val cols = line.split(",")

(cols(0).toLong, cols(1))

})

// 將users中的vertex屬性添加到graph中,生成graph2

// 使用joinVertices操作,根據id進行連接,用user中的屬性替換圖中對應Id的屬性 ,默認不在users中的頂點的屬性不變。

// 先將圖中的頂點屬性置空

//((vid,attr,user) =>user) 是個map函數

val graph2 = graph.mapVertices((id,attr) => "").joinVertices(users)(

(vid,attr,user) =>user)

println("\n\nConfirm vertices Internal of graph2")

graph2.vertices.collect().foreach(println(_))

// (1,Taro )

// (3,)

// (2,Jiro)

// 使用outerJoinVertices將user中的屬性賦給graph中的頂點,如果圖中頂點不在user的RDD中,則賦值為None

val graph3 = graph.mapVertices((id,attr)=> "").outerJoinVertices(users){

(vid,attr,user) => user.getOrElse("None")}

println("\n\nconfirm vertices Internal of graph3 ")

graph3.vertices.collect.foreach(println(_))

//(1,Taro )

//(3,None)

//(2,Jiro)

// 結果表明,如果graph的頂點在user中,則將user的屬性賦給graph中對應的頂點,否則賦值為None

sc.stop()

}

}

4.相鄰聚合(Neighborhood Aggregation)

圖分析任務的一個關鍵步驟是彙總每個頂點附近的信息。例如我們可能想知道每個用戶的追隨者的數量或者每個用戶的追隨者的平均年齡。許多迭代圖算法(如PageRank,最短路徑和連通體) 多次聚合相鄰頂點的屬性。

為了提高性能,主要的聚合操作從graph.mapReduceTriplets改為了新的graph.AggregateMessages。

4.1.聚合消息

GraphX中的核心聚合操作是aggregateMessages⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A])。 這個操作將用戶定義的sendMsg函數應用到圖的每個邊三元組(edge triplet),然後應用mergeMsg函數在其目的頂點聚合這些消息。

aggregateMessages 用於做map和reduce操作

class Graph[VD, ED] {

def aggregateMessages[Msg: ClassTag](

sendMsg: EdgeContext[VD, ED, Msg] => Unit,

mergeMsg: (Msg, Msg) => Msg,

tripletFields: TripletFields = TripletFields.All)

: VertexRDD[Msg]

}

其中用到的Graph類的API

mapReduceTriplets():計算每個節點的相鄰的邊緣和頂點的值,用戶定義的mapFunc函數會在圖的每一條邊調用,產生0或者多個message發送到這條邊兩個頂點其中一個當中,reduceFunc函數用來合併map階段的輸出到每個節點。

例:用aggregateMessages操作計算每個用戶更年長的追隨者的平均年齡。

import org.apache.log4j.{Level, Logger}

import org.apache.spark.graphx.VertexRDD

import org.apache.spark.graphx.util.GraphGenerators

import org.apache.spark.{SparkContext, SparkConf}

/**

* Created by lichangyue on 2016/9/18.

*/

object TestAggregateMessage {

def main(args: Array[String]) {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

val conf = new SparkConf()

val sc = new SparkContext("local","text",conf)

// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.

val graph = GraphGenerators.logNormalGraph(sc,numVertices = 10)

.mapVertices((id,_) => id.toDouble)

graph.vertices.collect.foreach(println(_))

graph.edges.collect.foreach(println(_))

// Compute the number of older followers and their total age

val olderFollowers:VertexRDD[(Int,Double)] = graph.aggregateMessages[(Int,Double)](

triplet =>{// Map Function

if(triplet.srcAttr > triplet.dstAttr){

// Send message to destination vertex containing counter and age

triplet.sendToDst(1, triplet.srcAttr)

}

},

(a,b) => (a._1 + b._1, a._2 + b._2)// Reduce Function, 對sendToDst 中的值做reduce操作

)

println("")

olderFollowers.collect.foreach(println(_))

// (4,(2,17.0))

// (0,(4,18.0))

// (1,(3,11.0))

// (6,(3,24.0))

// (3,(3,18.0))

// (7,(2,17.0))

// (8,(2,18.0))

// (5,(2,16.0))

// (2,(2,16.0))

val avgAgeOfOlderFollowers = olderFollowers.mapValues((id,value) =>value match{

case (count,totalAge) => totalAge/count

})

avgAgeOfOlderFollowers.collect().foreach(println(_))

// (4,8.5)

// (0,4.5)

// (1,3.6666666666666665)

// (6,8.0)

// (3,6.0)

// (7,8.5)

// (8,9.0)

// (5,8.0)

// (2,8.0)

}

}

4.2.計算度信息

最一般的聚合任務就是計算頂點的度,即每個頂點相鄰邊的數量。在有向圖中,經常需要知道頂點的入度、出度以及總共的度。GraphOps 類包含一個操作集合用來計算每個頂點的度。例如,下面的例子計算最大的入度、出度和總度。

// Define a reduce operation to compute the highest degree vertexdef max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {

if (a._2 > b._2) a else b

}

// Compute the max degreesval maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)

val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)

val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)

4.3. collecting Neighbors

在某些情況下,通過收集每個頂點相鄰的頂點及它們的屬性來表達計算可能更容易

class GraphOps[VD, ED] {

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]

}

例:計算degree和neighbor

import org.apache.log4j.{Level, Logger}

import org.apache.spark.graphx.{EdgeDirection, VertexId, GraphLoader}

import org.apache.spark.{SparkConf, SparkContext}

/**

* Created by lichangyue on 2016/9/18.

*/

object TestDegree {

def main(args: Array[String]) {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

val conf = new SparkConf()

val sc = new SparkContext("local", "test"

, conf)

// 2 1

// 3 1

// 4 1

// 5 1

// 1 2

// 4 3

// 5 3

// 1 4

val graph = GraphLoader.edgeListFile(sc, "hdfs://S7SA053:8020/stat/degree.csv").cache()

println("\n\nConfirm vertices Internal of graph")

graph.vertices.collect().foreach(println(_))

// (4,1)

// (2,1)

// (1,1)

// (3,1)

// (5,1)

println("\n\n confirm edges internal of grahp")

graph.edges.collect.foreach(println(_))

// Edge(2,1,1)

// Edge(3,1,1)

// Edge(4,1,1)

// Edge(5,1,1)

// Edge(1,2,1)

// Edge(1,4,1)

// Edge(4,3,1)

// Edge(5,3,1)

//1.degree

println("\n\nconfirm indegrees")

graph.inDegrees.collect.foreach(d => println(d._1 + "'s inDegree is " + d._2))

// 4's inDegree is 1

// 1's inDegree is 4

// 3's inDegree is 2

// 2's inDegree is 1

println("\n\n confirm outDegrees")

graph.outDegrees.collect.foreach(d => println(d._1 + "'s outDegree is " + d._2))

// 4's outDegree is 2

// 1's outDegree is 2

// 3's outDegree is 1

// 5's outDegree is 2

// 2's outDegree is 1

println("\n\n cofirm degrees"

)

graph.degrees.collect.foreach(d => println(d._1 + "'s degree is " + d._2))

// 4's degree is 3

// 1's degree is 6

// 3's degree is 3

// 5's degree is 2

// 2's degree is 2

def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {

if (a._2 > b._2) a else b

}

println("\n\nconfirm max indegrees")

println(graph.inDegrees.reduce(max))

// (1,4)

//2.collectNeighborIds

println("\n\nconfirm collectNerighber(In)")

graph.collectNeighborIds(EdgeDirection.In)

.collect

.foreach(n => println(n._1 + "'s in nerghbors:" + n._2.mkString(","

)))

// 4's in neighbors : 1

// 2's in neighbors : 1

// 1's in neighbors : 2,3,4,5

// 3's in neighbors : 4,5

// 5's in neighbors :

println("\n\nconfirm collectNeighborIds(OUT)")

graph.collectNeighborIds(EdgeDirection.Out)

.collect

.foreach(n => println(n._1 + "'s out neighbors:" + n._2.mkString(",")))

// 4's out neighbors:1,3

// 1's out neighbors:2,4

// 3's out neighbors:1

// 5's out neighbors:1,3

// 2's out neighbors:

println("\n\n~~~~~~~~~ Confirm collectNeighborIds(Either) ")

graph.collectNeighborIds(EdgeDirection.Either).collect.foreach(n => println(n._1 + "'s neighbors : " + n._2.distinct.mkString(",")))

// 4's neighbors : 1,3

// 2's neighbors : 1

// 1's neighbors : 2,3,4,5

// 3's neighbors : 1,4,5

// 5's neighbors : 1,3

//3.collectNeighbor~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

println("\n\nconfirm collectNeighbors(IN)")

graph.collectNeighbors(EdgeDirection.In)

.collect

.foreach(n => println(n._1 + "'s in berghbors :" + n._2.mkString(",")))

// 4's in berghbors :(1,1)

// 1's in berghbors :(2,1),(3,1),(4,1),(5,1)

// 3's in berghbors :(4,1),(5,1)

// 5's in berghbors :

// 2's in berghbors :(1,1)

println("\n\n~~~~~~~~~ Confirm collectNeighbors(OUT) ")

graph.collectNeighbors(EdgeDirection.Out).collect.foreach(n => println(n._1 + "'s out neighbors : " + n._2.mkString(",")))

// 4's out neighbors : (1,1),(3,1)

// 1's out neighbors : (2,1),(4,1)

// 3's out neighbors : (1,1)

// 5's out neighbors : (1,1),(3,1)

// 2's out neighbors : (1,1)

println("\n\n~~~~~~~~~ Confirm collectNeighbors(Either) ")

graph.collectNeighbors(EdgeDirection.Either).collect.foreach(n => println(n._1 + "'s neighbors : " + n._2.distinct.mkString(",")))

// 4's neighbors : (1,1),(3,1)

// 2's neighbors : (1,1)

// 1's neighbors : (2,1),(3,1),(4,1),(5,1)

// 3's neighbors : (1,1),(4,1),(5,1)

// 5's neighbors : (1,1),(3,1)

sc.stop

}

}

5.緩存和不緩存

在Spark中,RDDs默認是不緩存的。為了避免重複計算,當需要多次利用它們時,我們必須顯示地緩存它們。GraphX中的圖也有相同的方式。當利用到圖多次時,確保首先訪問Graph.cache()方法。


分享到:


相關文章: