使用GraphX构建图和查看图属性
使用GraphX可以为完整的图分析工作流或管道提供一个集成平台。图分析管道通常包括以下步骤:
- (1)读取原始数据。
- (2)预处理数据(如清洗数据)。
- (3)提取顶点和边来创建属性图。
- (4)切分子图。
- (5)运行图算法。
- (6)分析结果。
- (7)对图的另一部分重复步骤5和6。
GraphX的Graph对象是用户操作图的入口,它包含了边(edges)、顶点(vertices)以及triplets三部分,并且这三部分都包含相应的属性,可以携带额外的信息。
构建图的入口方法有两种,分别是根据边构建和根据边的两个顶点构建。
- 根据边构建图:Graph.fromEdges。
- 根据边的两个顶点数据构建:Graph.fromEdgeTuples。
不管是根据边构建图还是根据边的两个顶点数据构建,最终都是使用GraphImpl来构建的,即调用了GraphImpl的apply方法。
构建图的过程很简单,分为三步,它们分别是构建边EdgeRDD、构建顶点VertexRDD、生成Graph对象。在本节中,我们将学习如何使用GraphX构造和转换图。我们以一个社交网络为例,它有10个顶点和14条边,如下图所示:
我们创建上边这个属性图,表示类似于Twitter的用户网络的社交网络。在这个属性图中,顶点表示用户,有向边表示“follows”关系。顶点中的数字代表顶点id。
1. 构造图
Spark GraphX库的Graph对象提供了从RDD构造图的工厂方法。一种常用的方法是使用一个包含元组(由一个顶点ID和一个顶点属性对象组成)的RDD和一个包含Edge对象的RDD来实例化一个Graph对象。我们将使用这个方法来构造上图中的示例图。
// 首先,需要导入所需的类 import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.sql.SparkSession ...... // case class:用来代表顶点属性,存储一个用户的信息 case class User(name: String, age: Int) def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local") .appName("cep demo") .getOrCreate() // 构造顶点RDDs val users = List( (1L, User("Alex", 26)), (2L, User("Bill", 42)), (3L, User("Carol", 18)), (4L, User("Dave", 16)), (5L, User("Eve", 45)), (6L, User("Farell", 30)), (7L, User("Garry", 32)), (8L, User("Harry", 36)), (9L, User("Ivan", 28)), (10L, User("Jill", 48)) ) val usersRDD = spark.sparkContext.parallelize(users) // 构造边RDDs val follows = List( Edge(1L, 2L, 1), Edge(2L, 3L, 1), Edge(3L, 1L, 1), Edge(3L, 4L, 1), Edge(3L, 5L, 1), Edge(4L, 5L, 1), Edge(6L, 5L, 1), Edge(7L, 6L, 1), Edge(6L, 8L, 1), Edge(7L, 8L, 1), Edge(7L, 9L, 1), Edge(9L, 8L, 1), Edge(8L, 10L, 1), Edge(10L, 9L, 1), Edge(1L, 11L, 1) ) val followsRDD = spark.sparkContext.parallelize(follows) // 创建默认属性集(id为11的顶点没有任何属性) // 它会将默认属性分配给没有被显式分配任何属性的顶点 val defaultUser = User("NA", 0) // 构造图对象 val socialGraph = Graph(usersRDD, followsRDD, defaultUser) }
前面代码片段中的Graph对象从顶点RDD和边RDD创建Graph类的实例。
2. 使用GraphX API查看图属性
正如我们所说,在GraphX中表示图的主类是Graph。但是还有一个GraphOps类,它的方法被隐式添加到了Graph对象。其中包括获取顶点节点数量、入度、出度等的属性和方法。
GraphOps API:查看
下面查看图的一些属性:
// 边的数量 val numEdges = socialGraph.numEdges println("边的数量:" + numEdges) // 顶点的数量 val numVertices = socialGraph.numVertices println("顶点的数量:" + numVertices) // 顶点的入度 val inDegrees = socialGraph.inDegrees println("顶点的入度:") inDegrees.collect.foreach(println) // 顶点的出度 val outDegrees = socialGraph.outDegrees println("顶点的出度:") outDegrees.collect.foreach(println) // 顶点的出入度总和 val degrees = socialGraph.degrees println("顶点的出入度总和:") degrees.collect.foreach(println)
输出如下:
边的数量:15 顶点的数量:11 顶点的入度: (4,1) (11,1) (1,1) (6,1) (3,1) (9,2) (8,3) (10,1) (5,3) (2,1) 顶点的出度: (4,1) (1,2) (6,2) (3,3) (7,3) (9,1) (8,1) (10,1) (2,1) 顶点的出入度总和: (4,2) (11,1) (1,3) (6,3) (3,4) (7,3) (9,3) (8,4) (10,2) (5,3) (2,2)
获得属性图中顶点的集合视图,代码如下:
val vertices = socialGraph.vertices // 获得所有的顶点 println("属性图中顶点的集合视图:") vertices.collect.foreach(println)
输出如下:
属性图中顶点的集合视图: (4,User(Dave,16)) (11,User(NA,0)) (1,User(Alex,26)) (6,User(Farell,30)) (3,User(Carol,18)) (7,User(Garry,32)) (9,User(Ivan,28)) (8,User(Harry,36)) (10,User(Jill,48)) (5,User(Eve,45)) (2,User(Bill,42))
获得属性图中边的集合视图,代码如下:
val edges = socialGraph.edges // 获得所有的边 println("属性图中边的集合视图:") edges.collect.foreach(println)
输出如下:
属性图中边的集合视图: Edge(1,2,1) Edge(1,11,1) Edge(2,3,1) Edge(3,1,1) Edge(3,4,1) Edge(3,5,1) Edge(4,5,1) Edge(6,5,1) Edge(6,8,1) Edge(7,6,1) Edge(7,8,1) Edge(7,9,1) Edge(8,10,1) Edge(9,8,1) Edge(10,9,1)
在org.apache.spark.graphx包中有一个EdgeTriplet类,它的一个实例表示一条边和该边连接的两个顶点的组合,存储一条边的属性和它连接的两个顶点。EdgeTriplet类继承自Edge类,增加了分别包含源属性和目标属性的srcAttr和dstAttr成员。它还包含边的源顶点和目标顶点的惟一标识符。
// 获得属性图中triplet集合视图 val triplets = socialGraph.triplets // triplets.take(3) // 输出源顶点ID、源顶点属性name、目标顶点ID和目标顶点属性name triplets.take(3).foreach{ t => println(t.srcId + "," + t.srcAttr.name + " ---> " + t.dstId + "," + t.dstAttr.name) } // triplets.collect.foreach(println)
输出如下:
1,Alex ---> 2,Bill 1,Alex ---> 11,NA 2,Bill ---> 3,Carol