首先准备如下社交图形数据:
打开spark-shell;
导入相关包:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
创建如上graph对象:
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, Boolean))] =sc.parallelize(Array((1L, ("Li Yapeng", true)), (2L, ("Wang Fei", false)), (3L, ("Xie Tingfeng", true)), (4L, ("Zhang Bozhi", false)), (5L, ("Chen Guanxi", true))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =sc.parallelize(Array(Edge(1L, 2L, "spouse"), Edge(2L, 3L, "spouse"),Edge(3L, 4L, "spouse"), Edge(4L, 5L, "friend"), Edge(3L, 5L, "friend")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("Who?", false)
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
尝试打印出所有的男艺人:
graph.vertices.filter(_._2._2).collectres8: Array[(org.apache.spark.graphx.VertexId, (String, Boolean))] = Array((1,(Li Yapeng,true)), (3,(Xie Tingfeng,true)), (5,(Chen Guanxi,true)))
按焦点程度逆序打印出艺人的ID:
graph.degrees.sortBy(_._2,false).collectres12: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((3,3), (2,2), (4,2), (5,2), (1,1))
但是没办法知道艺人的名字,join一下:
graph.degrees.leftJoin(graph.vertices)((vid, vd1, vd2)=>(vd2.get._1,vd1)).sortBy(_._2._2, false).collectres35: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((3,(Xie Tingfeng,3)), (2,(Wang Fei,2)), (4,(Zhang Bozhi,2)), (5,(Chen Guanxi,2)), (1,(Li Yapeng,1)))
直接拿vd2.get有点冒险,下面改成安全版本:
graph.degrees.leftJoin(graph.vertices)((vid, vd1, vd2)=>(vd2 match {case Some(vvd2) => (vvd2._1, vd1); case None => ("", vd1)})).sortBy(_._2._2, false).collect
测试一下消息机制,发送配偶信息给每个人:
graph.aggregateMessages({
(ctx:EdgeContext[(String, Boolean),String,String])=>
if(ctx.attr=="spouse"){
ctx.sendToSrc(ctx.dstAttr._1);
ctx.sendToDst(ctx.srcAttr._1)
}
}, ((s1:String,s2:String)=>s1+"|"+s2)).collectres46: Array[(org.apache.spark.graphx.VertexId, String)] = Array((1,Wang Fei), (2,Li Yapeng|Xie Tingfeng), (3,Wang Fei|Zhang Bozhi), (4,Xie Tingfeng))
输出pagerank值:
import org.apache.spark.graphx.lib
graph.pageRank(0.01).vertices.collect.foreach(println)(1,0.15)
(2,0.27749999999999997)
(3,0.38587499999999997)
(4,0.313996875)
(5,0.58089421875)
数一数每个艺人所处的三角关系:
graph.triangleCount.vertices.collectres48: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,0), (2,0), (3,1), (4,1), (5,1))