使用GraphX构建图和查看图属性

使用GraphX可以为完整的图分析工作流或管道提供一个集成平台。图分析管道通常包括以下步骤:

  • (1)读取原始数据。
  • (2)预处理数据(如清洗数据)。
  • (3)提取顶点和边来创建属性图。
  • (4)切分子图。
  • (5)运行图算法。
  • (6)分析结果。
  • (7)对图的另一部分重复步骤5和6。

GraphX的Graph对象是用户操作图的入口,它包含了边(edges)、顶点(vertices)以及triplets三部分,并且这三部分都包含相应的属性,可以携带额外的信息。

构建图的入口方法有两种,分别是根据边构建和根据边的两个顶点构建。

  • 根据边构建图:Graph.fromEdges。
  • 根据边的两个顶点数据构建:Graph.fromEdgeTuples。

不管是根据边构建图还是根据边的两个顶点数据构建,最终都是使用GraphImpl来构建的,即调用了GraphImpl的apply方法。

构建图的过程很简单,分为三步,它们分别是构建边EdgeRDD、构建顶点VertexRDD、生成Graph对象。在本节中,我们将学习如何使用GraphX构造和转换图。我们以一个社交网络为例,它有10个顶点和14条边,如下图所示:

我们创建上边这个属性图,表示类似于Twitter的用户网络的社交网络。在这个属性图中,顶点表示用户,有向边表示“follows”关系。顶点中的数字代表顶点id。

1. 构造图

Spark GraphX库的Graph对象提供了从RDD构造图的工厂方法。一种常用的方法是使用一个包含元组(由一个顶点ID和一个顶点属性对象组成)的RDD和一个包含Edge对象的RDD来实例化一个Graph对象。我们将使用这个方法来构造上图中的示例图。

// 首先,需要导入所需的类
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.SparkSession
......

  // case class:用来代表顶点属性,存储一个用户的信息
  case class User(name: String, age: Int)

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local")
      .appName("cep demo")
      .getOrCreate()


    // 构造顶点RDDs
val users = List(
      (1L, User("Alex", 26)),
      (2L, User("Bill", 42)),
      (3L, User("Carol", 18)),
      (4L, User("Dave", 16)),
      (5L, User("Eve", 45)),
      (6L, User("Farell", 30)),
      (7L, User("Garry", 32)),
      (8L, User("Harry", 36)),
      (9L, User("Ivan", 28)),
      (10L, User("Jill", 48))
    )
    val usersRDD = spark.sparkContext.parallelize(users)

    // 构造边RDDs
    val follows = List( Edge(1L, 2L, 1),
      Edge(2L, 3L, 1),
      Edge(3L, 1L, 1),
      Edge(3L, 4L, 1),
      Edge(3L, 5L, 1),
      Edge(4L, 5L, 1),
      Edge(6L, 5L, 1),
      Edge(7L, 6L, 1),
      Edge(6L, 8L, 1),
      Edge(7L, 8L, 1),
      Edge(7L, 9L, 1),
      Edge(9L, 8L, 1),
      Edge(8L, 10L, 1),
      Edge(10L, 9L, 1),
      Edge(1L, 11L, 1)
    )
    val followsRDD = spark.sparkContext.parallelize(follows)

    // 创建默认属性集(id为11的顶点没有任何属性)
    // 它会将默认属性分配给没有被显式分配任何属性的顶点
    val defaultUser = User("NA", 0)

    // 构造图对象
    val socialGraph = Graph(usersRDD, followsRDD, defaultUser)
  }

前面代码片段中的Graph对象从顶点RDD和边RDD创建Graph类的实例。

2. 使用GraphX API查看图属性

正如我们所说,在GraphX中表示图的主类是Graph。但是还有一个GraphOps类,它的方法被隐式添加到了Graph对象。其中包括获取顶点节点数量、入度、出度等的属性和方法。

GraphOps API:查看

下面查看图的一些属性:

    // 边的数量
    val numEdges = socialGraph.numEdges
    println("边的数量:" + numEdges)  

    // 顶点的数量
    val numVertices = socialGraph.numVertices
    println("顶点的数量:" + numVertices)

    // 顶点的入度
    val inDegrees = socialGraph.inDegrees
    println("顶点的入度:")
    inDegrees.collect.foreach(println)   

    // 顶点的出度
    val outDegrees = socialGraph.outDegrees
    println("顶点的出度:")
    outDegrees.collect.foreach(println) 

    // 顶点的出入度总和
    val degrees = socialGraph.degrees
    println("顶点的出入度总和:")
    degrees.collect.foreach(println)

输出如下:

边的数量:15
顶点的数量:11

顶点的入度:
(4,1)
(11,1)
(1,1)
(6,1)
(3,1)
(9,2)
(8,3)
(10,1)
(5,3)
(2,1)

顶点的出度:
(4,1)
(1,2)
(6,2)
(3,3)
(7,3)
(9,1)
(8,1)
(10,1)
(2,1)

顶点的出入度总和:
(4,2)
(11,1)
(1,3)
(6,3)
(3,4)
(7,3)
(9,3)
(8,4)
(10,2)
(5,3)
(2,2)

获得属性图中顶点的集合视图,代码如下:

    val vertices = socialGraph.vertices     // 获得所有的顶点
    println("属性图中顶点的集合视图:")
    vertices.collect.foreach(println)

输出如下:

属性图中顶点的集合视图:
(4,User(Dave,16))
(11,User(NA,0))
(1,User(Alex,26))
(6,User(Farell,30))
(3,User(Carol,18))
(7,User(Garry,32))
(9,User(Ivan,28))
(8,User(Harry,36))
(10,User(Jill,48))
(5,User(Eve,45))
(2,User(Bill,42))

获得属性图中边的集合视图,代码如下:

    val edges = socialGraph.edges           // 获得所有的边
    println("属性图中边的集合视图:")
    edges.collect.foreach(println)

输出如下:

属性图中边的集合视图:
Edge(1,2,1)
Edge(1,11,1)
Edge(2,3,1)
Edge(3,1,1)
Edge(3,4,1)
Edge(3,5,1)
Edge(4,5,1)
Edge(6,5,1)
Edge(6,8,1)
Edge(7,6,1)
Edge(7,8,1)
Edge(7,9,1)
Edge(8,10,1)
Edge(9,8,1)
Edge(10,9,1)

在org.apache.spark.graphx包中有一个EdgeTriplet类,它的一个实例表示一条边和该边连接的两个顶点的组合,存储一条边的属性和它连接的两个顶点。EdgeTriplet类继承自Edge类,增加了分别包含源属性和目标属性的srcAttr和dstAttr成员。它还包含边的源顶点和目标顶点的惟一标识符。

// 获得属性图中triplet集合视图
val triplets = socialGraph.triplets
// triplets.take(3)

// 输出源顶点ID、源顶点属性name、目标顶点ID和目标顶点属性name
triplets.take(3).foreach{ t => 
    println(t.srcId + "," + t.srcAttr.name + " ---> " + t.dstId + "," + t.dstAttr.name)  
}
// triplets.collect.foreach(println)

输出如下:

1,Alex ---> 2,Bill
1,Alex ---> 11,NA
2,Bill ---> 3,Carol

《PySpark原理深入与编程实战》