图结构处理示例

在下面这个图结构处理示例中,我们创建一个图,然后删除缺失的顶点,并合并边属性。

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.SparkSession
......

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("graphx demo")
      .getOrCreate()

    // 创建顶点RDD
    val users = spark.sparkContext.parallelize(
      Array(
        (3L, ("rxin", "student")),
        (7L, ("jgonzal", "postdoc")),
        (5L, ("franklin", "prof")),
        (2L, ("istoica", "prof")),
        (4L, ("peter", "student"))
      )
    )
    // 创建边RDD
    val relationships = spark.sparkContext.parallelize(
      Array(
        Edge(3L, 7L, "collab"),
        Edge(3L, 7L, "colleague"),
        Edge(5L, 3L, "advisor"),
        Edge(2L, 5L, "colleague"),
        Edge(5L, 7L, "pi"),
        Edge(4L, 0L, "student"),
        Edge(5L, 0L, "colleague")
      )
    )
    // 定义一个默认用户,以防与缺失的用户有关系
    val defaultUser = ("John Doe", "Missing")

    // 构建初始图
    val graph = Graph(users, relationships, defaultUser)

    // 注意有一个用户0 (我们没有它的信息)连接到用户4 (peter) 和 5 (franklin).
    graph.triplets.map(
      triplet => triplet.srcAttr._1 + "是" + triplet.dstAttr._1 + "的" + triplet.attr
    ).collect.foreach(println)

    // 删除缺失的顶点以及连接到它们的边
    // // 由于带有无效顶点的边也是无效的,因此过滤掉那些顶点并创建一个有效的图。
    val validGraph = graph.subgraph(vpred = (vertexId, attribute) => attribute._2 != "Missing")

    // 有效的子图将通过删除用户0断开用户4和用户5的连接
    println("--------------------------------------")
    validGraph.vertices.collect.foreach(println)

    println("--------------------------------------")
    validGraph.triplets.map(
      triplet => triplet.srcAttr._1 + "是" + triplet.dstAttr._1 + "的" + triplet.attr
    ).collect.foreach(println(_))

    // 合并并行边,使用groupEdges结构操作
    // 导入分区策略类
    import org.apache.spark.graphx.PartitionStrategy._

    // 对用户图进行分区,这是对边进行分组必须的
    /* CanonicalRandomVertexCut分区策略确保两个顶点之间的所有边发生共存,而不受任何方向影响。*/
    val partitionedUserGraph = validGraph.partitionBy(CanonicalRandomVertexCut)

    // 生成没有平行边的图形,并组合合重复边的属性
    val graphWithoutParallelEdges = partitionedUserGraph.groupEdges((e1,e2) => e1 + " and " + e2)

    // 输出详细信息
    println("--------------------------------------")
    graphWithoutParallelEdges.triplets.collect.foreach(println)
  }

输出结果如下:

rxin是jgonzal的collab
rxin是jgonzal的colleague
franklin是rxin的advisor
istoica是franklin的colleague
franklin是jgonzal的pi
peter是John Doe的student
franklin是John Doe的colleague
--------------------------------------
(2,(istoica,prof))
(3,(rxin,student))
(4,(peter,student))
(5,(franklin,prof))
(7,(jgonzal,postdoc))
--------------------------------------
rxin是jgonzal的collab
rxin是jgonzal的colleague
franklin是rxin的advisor
istoica是franklin的colleague
franklin是jgonzal的pi
--------------------------------------
((5,(franklin,prof)),(7,(jgonzal,postdoc)),pi)
((3,(rxin,student)),(7,(jgonzal,postdoc)),collab and colleague)
((2,(istoica,prof)),(5,(franklin,prof)),colleague)
((5,(franklin,prof)),(3,(rxin,student)),advisor)

《PySpark原理深入与编程实战》