使用GraphX API操作图

正如我们所说,在GraphX中表示图的主类是Graph。但是还有一个GraphOps类,它的方法被隐式添加到了Graph对象。在确定要使用哪个方法时,我们需要考虑这两个类。在这一节中提到的一些方法来自Graph类,还有一些方法来自于GraphOps类。

1. 属性转换操作

Graphx类提供有一个高阶函数mapVertices,用来对顶点属性进行转换,其参数函数返回一个新的顶点属性。mapVertices用来更新顶点属性。从图的构建我们知道,顶点属性保存在边分区中,所以我们需要改变的是边分区中的属性。例如,我们需要将所有表示用户信息的顶点属性user的年龄增加1岁,执行代码如下:

// 应用mapVertices方法对顶点属性进行转换
val updatedAges = socialGraph.mapVertices( (vertexId, user) => User(user.name, user.age + 1 ))

println("修改之前的顶点属性:")
socialGraph.vertices.take(5).foreach(println)         // 修改之前的顶点

println("修改之后的顶点属性:")
updatedAges.vertices.take(5).foreach(println)         // 修改之后的顶点

输出结果如下:

修改之前的顶点属性:
(4,User(Dave,16))
(11,User(NA,0))
(1,User(Alex,26))
(6,User(Farell,30))
(3,User(Carol,18))

修改之后的顶点属性:
(4,User(Dave,17))
(11,User(NA,1))
(1,User(Alex,27))
(6,User(Farell,31))
(3,User(Carol,19))

Graphx类还提供有一个高阶函数mapEdges,用来对边属性进行转换,其参数函数返回一个新的边属性。mapEdges方法用来更新边属性,它需要一个map函数,它接受分区ID和该分区中的边的迭代器,并返回转换后的迭代器(其中对于每个输入边都包含一个新edge属性对象(而不是新的edge)。下面的代码使用mapEdges方法将每个边的属性从整数修改为字符串:

// 使用mapEdges方法将每个边的属性从整数修改为字符串
val followsGraph = socialGraph.mapEdges( (n) => "follows")

println("修改之前的边属性:")
socialGraph.edges.take(5).foreach(println)

println("修改之后的边属性:")
followsGraph.edges.take(5).foreach(println)

输出结果如下:

修改之前的边属性:
Edge(1,2,1)
Edge(1,11,1)
Edge(2,3,1)
Edge(3,1,1)
Edge(3,4,1)

修改之后的边属性:
Edge(1,2,follows)
Edge(1,11,follows)
Edge(2,3,follows)
Edge(3,1,follows)
Edge(3,4,follows)

Graphx类还提供有一个高阶函数mapTriplets,用来更新边属性。它用在转换边属性时如果需要相邻的顶点值的情况下,使用map函数一次转换一个分区的每个边属性,并将相邻顶点属性传递给分区。下面的代码修改源图中的边属性,并返回一个新的边属性:

// 如果追随者的年龄大于30,将边属性更改为2
val weightedGraph = socialGraph.mapTriplets{ t =>
   if (t.srcAttr.age >= 30) 2 else 1
}

println("修改之前的边属性:")
socialGraph.edges.take(10).foreach(println)

println("修改之后的边属性:")
weightedGraph.edges.take(10).foreach(println)

输出结果如下:

修改之前的边属性:
Edge(1,2,1)
Edge(1,11,1)
Edge(2,3,1)
Edge(3,1,1)
Edge(3,4,1)
Edge(3,5,1)
Edge(4,5,1)
Edge(6,5,1)
Edge(6,8,1)
Edge(7,6,1)

修改之后的边属性:
Edge(1,2,1)
Edge(1,11,1)
Edge(2,3,2)
Edge(3,1,1)
Edge(3,4,1)
Edge(3,5,1)
Edge(4,5,1)
Edge(6,5,2)
Edge(6,8,2)
Edge(7,6,2)

2. 结构转换操作

Graphx类还提供有对整个结构进行转换的方法,例如反转一个图结构,或者从一个给定的图中提取子图。在Spark GraphX库中的这些结构操作,允许用户将图作为一个单独的单元处理,生成一个新的图。其中reverse方法反转属性图中所有边的方向,它返回一个新的属性图。如下面的代码所示:

// 对整个图结构进行反转
val reverseGraph = socialGraph.reverse

// 原来的图结构
println("转换之前的图结构:")
reverseGraph.triplets
      .map{ t => t.srcAttr.name + " follows " + t.dstAttr.name}
      .take(10)
      .foreach(println)

// 反转以后的图结构
println("转换之后的图结构:")
socialGraph.triplets
      .map{ t => t.srcAttr.name + " follows " + t.dstAttr.name}
      .take(10)     
      .foreach(println)

输出结果如下:

转换之前的图结构:
Alex follows Carol
Bill follows Alex
Carol follows Bill
Dave follows Carol
Eve follows Carol
Eve follows Dave
Eve follows Farell
Farell follows Garry
Harry follows Farell
Harry follows Garry

转换之后的图结构:
Alex follows Bill
Alex follows NA
Bill follows Carol
Carol follows Alex
Carol follows Dave
Carol follows Eve
Dave follows Eve
Farell follows Eve
Farell follows Harry
Garry follows Farell

Graphx类还提供了groupEdges高阶函数,用于将两个顶点之间的多条边合并为一条边。为了得到正确的结果,必须使用partitionBy对图进行分区。

def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local")
      .appName("cep demo")
      .getOrCreate()

    // 构造一个具有多边的图
    val multiEdges = List(
      Edge(1L, 2L, 100),
      Edge(1L, 2L, 200),
      Edge(2L, 3L, 300),
      Edge(2L, 3L, 400),
      Edge(3L, 1L, 200),
      Edge(3L, 1L, 300)
    )

    val multiEdgesRDD = spark.sparkContext.parallelize(multiEdges)

    val defaultVertexProperty = 1

    // 从边集合构造图形。其中Graph的静态方式.fromEdges的作用是:
    // 返回一个图,带有给定的multiEdgesRDD所描述的边属性,并且所有的顶点是指定的默认值
    val multiEdgeGraph = Graph.fromEdges(multiEdgesRDD, defaultVertexProperty)

    // 请注意,groupEdges方法要求并行边位于同一分区上。
    import org.apache.spark.graphx.PartitionStrategy._

    // 分区策略:CanonicalRandomVertexCut,它通过Hash散列源顶点id和目标顶点id为分区分配边。
    // 它共同定位两个顶点之间的所有边。
    val repartitionedGraph = multiEdgeGraph.partitionBy(CanonicalRandomVertexCut)
    val singleEdgeGraph = repartitionedGraph.groupEdges((edge1, edge2) => edge1 + edge2)

    println("边合并之前:")
    multiEdgeGraph.edges.collect.foreach(println)

    println("\n边合并之后:")
    singleEdgeGraph.edges.collect.foreach(println)
}

输出结果如下:

边合并之前:
Edge(1,2,100)
Edge(1,2,200)
Edge(2,3,300)
Edge(2,3,400)
Edge(3,1,200)
Edge(3,1,300)

边合并之后:
Edge(1,2,300)
Edge(2,3,700)
Edge(3,1,500)

groupEdges()操作合并多重图中的并行边(如顶点对之间重复的边)。在大量的应用程序中,并行的边可以合并(它们的权重合并)为一条边从而降低图的大小。

选择图子集

图上的另一个重要操作是选择子图(即只选择图的一部分)。有三种方法来实现这一点:

  • subgraph:根据提供的谓词选择顶点和边。
  • mask:只选择在另一张图中显示的顶点。
  • filter:前两个的组合。

Graphx类提供了subgraph方法,对每个顶点和边应用用户指定的过滤器。它返回源图的子图。概念上,它类似于RDD filter方法。下面是subgraph的签名:

def subgraph(
    epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
    vpred: (VertexId, VD) => Boolean = ((v, d) => true)): Graph[VD, ED]

该方法接收两个predicates(判断)作为参数,第一个predicates(epred)接收一个EdgeTriplet作为参数。如果它返回true,那么特定的边将被包含在结果图中。第二个 predicates(vpred)接收一个顶点ID及其属性对象作为参数。返回的子图,只包含满足这两个predicates的顶点和边。如果没有vpred函数参数,subgraph函数会在新图中保留所有原始顶点。在新图中不再存在顶点的边会自动被删除。subgraph方法的实现分两步:先过滤VertexRDD,然后再过滤EdgeRDD。

请看下面的示例:

// case class:用来代表顶点属性,存储一个用户的信息
case class User(name: String, age: Int)

def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("graphx demo")
      .getOrCreate()

    // 构造顶点RDDs
    val users = List(
      (1L, User("Alex", 26)),
      (2L, User("Bill", 42)),
      (3L, User("Carol", 18)),
      (4L, User("Dave", 16)),
      (5L, User("Eve", 45)),
      (6L, User("Farell", 30)),
      (7L, User("Garry", 32)),
      (8L, User("Harry", 36)),
      (9L, User("Ivan", 28)),
      (10L, User("Jill", 48))
    )
    val usersRDD = spark.sparkContext.parallelize(users)

    // 构造边RDDs
    val follows = List(
      Edge(1L, 2L, 1),
      Edge(2L, 3L, 1),
      Edge(3L, 1L, 1),
      Edge(3L, 4L, 1),
      Edge(3L, 5L, 1),
      Edge(4L, 5L, 1),
      Edge(6L, 5L, 1),
      Edge(7L, 6L, 1),
      Edge(6L, 8L, 1),
      Edge(7L, 8L, 1),
      Edge(7L, 9L, 1),
      Edge(9L, 8L, 1),
      Edge(8L, 10L, 1),
      Edge(10L, 9L, 1),
      Edge(1L, 11L, 1)
    )
    val followsRDD = spark.sparkContext.parallelize(follows)

    // 创建默认属性集(id为11的顶点没有任何属性)
    // 它会将默认属性分配给没有被显式分配任何属性的顶点
    val defaultUser = User("NA", 0)

    // 构造图对象
    val socialGraph = Graph(usersRDD, followsRDD, defaultUser)

    // 如果追随者的年龄大于30,将边属性更改为2
    val weightedGraph = socialGraph.mapTriplets{ t =>
      if (t.srcAttr.age >= 30) 2 else 1
    }

    // 这里只返回权重大于1的边
    val subgraph = weightedGraph.subgraph(
      edgeTriplet => edgeTriplet.attr > 1,
      (vertexId, vertexProperty) => true
    )

    println("原图如下:")
    weightedGraph.edges.take(10).foreach(println)

    println("\n子图如下:")
    subgraph.edges.take(10).foreach(println)
}

输出结果如下:

原图如下:
Edge(1,2,1)
Edge(2,3,2)
Edge(3,1,1)
Edge(3,4,1)
Edge(3,5,1)
Edge(4,5,1)
Edge(6,5,2)
Edge(6,8,2)
Edge(7,6,2)
Edge(7,8,2)

子图如下:
Edge(2,3,2)
Edge(6,5,2)
Edge(6,8,2)
Edge(7,6,2)
Edge(7,8,2)
Edge(7,9,2)
Edge(8,10,2)
Edge(10,9,2)

subgraph()操作利用顶点和边的判断式(predicates),返回的图仅仅包含满足顶点判断式的顶点、满足边判断式的边以及满足顶点判断式的triple。subgraph操作可以用于很多场景,如获取感兴趣的顶点和边组成的图或者获取清除断开连接后的图。

Graphx类还提供了mask()方法,mask()函数是在GraphX中过滤图的另一种方法。有了mask()函数,你可以把一个图投射到另一个图上,只保留在第二个图中存在的那些顶点和边,而不用考虑两个图的属性对象。mask()函数唯一的参数是第二个图。它限制图只包含另一个图(用作掩码的图)中的顶点和边,但保留此图的属,非常类似于SQL语法中的"create ... like ..."语句。请看下面的代码:

// case class:用来代表顶点属性,存储一个用户的信息
case class User(name: String, age: Int)

def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("graphx demo")
      .getOrCreate()

    // 构造顶点RDDs
    val users = List(
      (1L, User("Alex", 26)),
      (2L, User("Bill", 42)),
      (3L, User("Carol", 18)),
      (4L, User("Dave", 16)),
      (5L, User("Eve", 45)),
      (6L, User("Farell", 30)),
      (7L, User("Garry", 32)),
      (8L, User("Harry", 36)),
      (9L, User("Ivan", 28)),
      (10L, User("Jill", 48))
    )
    val usersRDD = spark.sparkContext.parallelize(users)

    // 构造边RDDs
    val follows = List(
      Edge(1L, 2L, 1),
      Edge(2L, 3L, 1),
      Edge(3L, 1L, 1),
      Edge(3L, 4L, 1),
      Edge(3L, 5L, 1),
      Edge(4L, 5L, 1),
      Edge(6L, 5L, 1),
      Edge(7L, 6L, 1),
      Edge(6L, 8L, 1),
      Edge(7L, 8L, 1),
      Edge(7L, 9L, 1),
      Edge(9L, 8L, 1),
      Edge(8L, 10L, 1),
      Edge(10L, 9L, 1),
      Edge(1L, 11L, 1)
    )
    val followsRDD = spark.sparkContext.parallelize(follows)

    // 创建默认属性集(id为11的顶点没有任何属性)
    // 它会将默认属性分配给没有被显式分配任何属性的顶点
    val defaultUser = User("NA", 0)

    // 构造图对象
    val socialGraph = Graph(usersRDD, followsRDD, defaultUser)

    // mask
    // 先构造另一个图femaleConnections
    val femaleConnections = List(
      Edge(2L, 3L, 0),
      Edge(3L, 1L, 0),
      Edge(3L, 4L, 0),
      Edge(3L, 5L, 0),
      Edge(4L, 5L, 0),
      Edge(6L, 5L, 0),
      Edge(8L, 10L, 0),
      Edge(10L, 9L, 0)
    )

    val femaleConnectionsRDD = spark.sparkContext.parallelize(femaleConnections)

    // 从边集合构造掩码图
    val femaleGraphMask = Graph.fromEdges(femaleConnectionsRDD, defaultUser)

    // 以 femaleGraphMask作为"掩码",返回在femaleGraphMask中包含的顶点的一个子图
    val femaleGraph = socialGraph.mask(femaleGraphMask)

    println("掩码图如下:")
    femaleGraphMask.triplets.take(10).foreach(println)

    println("\n子图如下:")
    femaleGraph.triplets.take(10).foreach(println)
}

输出结果如下:

掩码图如下:
((2,User(NA,0)),(3,User(NA,0)),0)
((3,User(NA,0)),(1,User(NA,0)),0)
((3,User(NA,0)),(4,User(NA,0)),0)
((3,User(NA,0)),(5,User(NA,0)),0)
((4,User(NA,0)),(5,User(NA,0)),0)
((6,User(NA,0)),(5,User(NA,0)),0)
((8,User(NA,0)),(10,User(NA,0)),0)
((10,User(NA,0)),(9,User(NA,0)),0)

子图如下:
((3,User(Carol,18)),(1,User(Alex,26)),1)
((3,User(Carol,18)),(4,User(Dave,16)),1)
((8,User(Harry,36)),(10,User(Jill,48)),1)
((10,User(Jill,48)),(9,User(Ivan,28)),1)

mask()操作构造一个子图,这个子图包含输入图中包含的顶点和边。它的实现很简单,顶点和边均做inner join操作即可。这个操作可以和subgraph()操作相结合,基于另外一个相关图的特征去约束一个图。

过滤图内容的第三个函数是filter(),它位于org.apache.spark.graphx.GraphOps类中。它与subgraph()和mask()函数都有关。该函数的签名如下:

def filter[VD2: ClassTag, ED2: ClassTag](
      preprocess: Graph[VD, ED] => Graph[VD2, ED2],
      epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
      vpred: (VertexId, VD2) => Boolean = (v: VertexId, d: VD2) => true): Graph[VD, ED] = {
    graph.mask(preprocess(graph).subgraph(epred, vpred))

它需要三个参数:一个预处理函数以及边和顶点判断函数。通过预处理函数把原来的图转换成另一个图,然后使用提供的边和顶点判断函数进行修剪。得到的图将用作原始图的mask(掩码)。换句话说,我们可以使用filter()函数将两个步骤合并到一个步骤中。

此函数可用于基于某些属性过滤图,而无需更改程序中的顶点和边值。例如,我们可以删除图中出度为0的顶点:

// case class:用来代表顶点属性,存储一个用户的信息
case class User(name: String, age: Int)

def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("graphx demo")
      .getOrCreate()

    // 构造顶点RDDs
    val users = List(
      (1L, User("Alex", 26)),
      (2L, User("Bill", 42)),
      (3L, User("Carol", 18)),
      (4L, User("Dave", 16)),
      (5L, User("Eve", 45)),
      (6L, User("Farell", 30)),
      (7L, User("Garry", 32)),
      (8L, User("Harry", 36)),
      (9L, User("Ivan", 28)),
      (10L, User("Jill", 48))
    )
    val usersRDD = spark.sparkContext.parallelize(users)

    // 构造边RDDs
    val follows = List(
      Edge(1L, 2L, 1),
      Edge(2L, 3L, 1),
      Edge(3L, 1L, 1),
      Edge(3L, 4L, 1),
      Edge(3L, 5L, 1),
      Edge(4L, 5L, 1),
      Edge(6L, 5L, 1),
      Edge(7L, 6L, 1),
      Edge(6L, 8L, 1),
      Edge(7L, 8L, 1),
      Edge(7L, 9L, 1),
      Edge(9L, 8L, 1),
      Edge(8L, 10L, 1),
      Edge(10L, 9L, 1),
      Edge(1L, 11L, 1)
    )
    val followsRDD = spark.sparkContext.parallelize(follows)

    // 创建默认属性集(id为11的顶点没有任何属性)
    // 它会将默认属性分配给没有被显式分配任何属性的顶点
    val defaultUser = User("NA", 0)

    // 构造图对象
    val socialGraph = Graph(usersRDD, followsRDD, defaultUser)

    // 删除图中出度为0的顶点
    val subGraph = socialGraph.filter(
      graph => {
        val degrees: VertexRDD[Int] = graph.outDegrees
        graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
      },
      vpred = (vid: VertexId, deg:Int) => deg > 0)


    println("子图的顶点:")
    subGraph.vertices.collect.foreach(println)
    println("\n子图的边:")
    subGraph.edges.collect.foreach(println)
}

输出结果如下:

子图的顶点:
(8,User(Harry,36))
(1,User(Alex,26))
(9,User(Ivan,28))
(10,User(Jill,48))
(2,User(Bill,42))
(3,User(Carol,18))
(4,User(Dave,16))
(6,User(Farell,30))
(7,User(Garry,32))

子图的边:
Edge(1,2,1)
Edge(2,3,1)
Edge(3,1,1)
Edge(3,4,1)
Edge(6,8,1)
Edge(7,6,1)
Edge(7,8,1)
Edge(7,9,1)
Edge(8,10,1)
Edge(9,8,1)
Edge(10,9,1)

3.连接操作

在许多情况下,有必要将外部数据加入到图中。例如,我们可能有额外的用户属性需要合并到已有的图中或者我们可能想从一个图中取出顶点特征加入到另外一个图中。这些任务可以用join连接操作完成。连接操作用来更新现有属性或向图中的顶点添加新属性。

在org.apache.spark.graphx.GraphOps类中,定义了一个joinVertices方法,它使用一个新的顶点集合(作为输入提供给源图)更新源图中的顶点,没有匹配的顶点保留其原始值。它有以下签名:

def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD) : Graph[VD, ED]

例如,假设顶点3和4的用户的年龄不正确,使用joinVertices方法来修改这两个用户的年龄。

// case class:用来代表顶点属性,存储一个用户的信息
case class User(name: String, age: Int)

def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("graphx demo")
      .getOrCreate()

    // 构造顶点RDDs
    val users = List(
      (1L, User("Alex", 26)),
      (2L, User("Bill", 42)),
      (3L, User("Carol", 18)),
      (4L, User("Dave", 16)),
      (5L, User("Eve", 45)),
      (6L, User("Farell", 30)),
      (7L, User("Garry", 32)),
      (8L, User("Harry", 36)),
      (9L, User("Ivan", 28)),
      (10L, User("Jill", 48))
    )
    val usersRDD = spark.sparkContext.parallelize(users)

    // 构造边RDDs
    val follows = List(
      Edge(1L, 2L, 1),
      Edge(2L, 3L, 1),
      Edge(3L, 1L, 1),
      Edge(3L, 4L, 1),
      Edge(3L, 5L, 1),
      Edge(4L, 5L, 1),
      Edge(6L, 5L, 1),
      Edge(7L, 6L, 1),
      Edge(6L, 8L, 1),
      Edge(7L, 8L, 1),
      Edge(7L, 9L, 1),
      Edge(9L, 8L, 1),
      Edge(8L, 10L, 1),
      Edge(10L, 9L, 1),
      Edge(1L, 11L, 1)
    )
    val followsRDD = spark.sparkContext.parallelize(follows)

    // 创建默认属性集(id为11的顶点没有任何属性)
    // 它会将默认属性分配给没有被显式分配任何属性的顶点
    val defaultUser = User("NA", 0)

    // 构造图对象
    val socialGraph = Graph(usersRDD, followsRDD, defaultUser)

    // 先创建一个作为输入提供给源图的顶点集合
    val correctAges = spark.sparkContext.parallelize(List((3L, 28), (4L, 26)))

    // 使用correctAges来更新源图中的顶点
    val correctedGraph = socialGraph
      .joinVertices(correctAges)((id, user, correctAge) => User(user.name, correctAge))

    println("原来的顶点:")
    socialGraph.vertices.collect.foreach(println)
    println("更新后的顶点:")
    correctedGraph.vertices.collect.foreach(println)

    println("--------------------------------------------------------------")
    // 比较两个图中顶点id为3和4的属性
    val incorrectSubGraph = socialGraph.subgraph(
      vpred = (vertexId, vertexProperty) => (vertexId == 3) || (vertexId == 4)
    )
    val correctedSubGraph = correctedGraph.subgraph(
      vpred = (vertexId, vertexProperty) => (vertexId == 3) || (vertexId == 4)
    )

    println("错误的子图:")
    incorrectSubGraph.vertices.collect.foreach(println)
    println("正确的子图:")
    correctedSubGraph.vertices.collect.foreach(println)
}

输出结果如下:

原来的顶点:
(8,User(Harry,36))
(1,User(Alex,26))
(9,User(Ivan,28))
(10,User(Jill,48))
(2,User(Bill,42))
(11,User(NA,0))
(3,User(Carol,18))
(4,User(Dave,16))
(5,User(Eve,45))
(6,User(Farell,30))
(7,User(Garry,32))
更新后的顶点:
(8,User(Harry,36))
(1,User(Alex,26))
(9,User(Ivan,28))
(10,User(Jill,48))
(2,User(Bill,42))
(11,User(NA,0))
(3,User(Carol,28))
(4,User(Dave,26))
(5,User(Eve,45))
(6,User(Farell,30))
(7,User(Garry,32))
--------------------------------------------------------------
错误的子图:
(3,User(Carol,18))
(4,User(Dave,16))
正确的子图:
(3,User(Carol,28))
(4,User(Dave,26))

在org.apache.spark.graphx.Graph类中还提供了一个outerJoinVertices()方法,用来向源图中的顶点添加新属性。这个函数用于根据外部数据用新的值更新顶点。它有以下签名:

def outerJoinVertices[U:ClassTag, VD2:ClassTag](other: RDD[(VertexId, U)])
     (mapFunc: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED]

这个函数需要提供两个参数:一个RDD(包含带有顶点IDs和新顶点对象的元组),以及一个映射函数(将旧的顶点属性对象(类型VD)和来自输入RDD(类型U)的新顶点对象组合在一起)。如果在输入RDD中对于某个特定的顶点ID没有对象存在,则该映射函数接收None。

现在假设我们想向源图中的每个用户增加一个新的city属性,请看下面的示例代码:

import org.apache.spark.graphx._
import org.apache.spark.sql.SparkSession
......

  // case class
  // 用来代表顶点属性,存储一个用户的信息
  case class User(name: String, age: Int)
  // 用来代表具有city属性的用户信息
  case class UserWithCity(name: String, age: Int, city: String)

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("graphx demo")
      .getOrCreate()

    // 构造顶点RDDs
    val users = List(
      (1L, User("Alex", 26)),
      (2L, User("Bill", 42)),
      (3L, User("Carol", 18)),
      (4L, User("Dave", 16)),
      (5L, User("Eve", 45)),
      (6L, User("Farell", 30)),
      (7L, User("Garry", 32)),
      (8L, User("Harry", 36)),
      (9L, User("Ivan", 28)),
      (10L, User("Jill", 48))
    )
    val usersRDD = spark.sparkContext.parallelize(users)

    // 构造边RDDs
    val follows = List(
      Edge(1L, 2L, 1),
      Edge(2L, 3L, 1),
      Edge(3L, 1L, 1),
      Edge(3L, 4L, 1),
      Edge(3L, 5L, 1),
      Edge(4L, 5L, 1),
      Edge(6L, 5L, 1),
      Edge(7L, 6L, 1),
      Edge(6L, 8L, 1),
      Edge(7L, 8L, 1),
      Edge(7L, 9L, 1),
      Edge(9L, 8L, 1),
      Edge(8L, 10L, 1),
      Edge(10L, 9L, 1),
      Edge(1L, 11L, 1)
    )
    val followsRDD = spark.sparkContext.parallelize(follows)

    // 创建默认属性集(id为11的顶点没有任何属性)
    // 它会将默认属性分配给没有被显式分配任何属性的顶点
    val defaultUser = User("NA", 0)

    // 构造图对象
    val socialGraph = Graph(usersRDD, followsRDD, defaultUser)

    // 外部数据
    val userCities = spark.sparkContext.parallelize(
      List(
        (1L, "Boston"),
        (3L, "New York"),
        (5L, "London"),
        (7L, "Bombay"),
        (9L, "Tokyo"),
        (10L, "Palo Alto")
      )
    )

    // 使用外部数据更新源图中的顶点
    val socialGraphWithCity = socialGraph
      .outerJoinVertices(userCities)(
        (id, user, cityOpt) =>
          cityOpt match {
            case Some(city) => UserWithCity(user.name, user.age, city)
            case None => UserWithCity(user.name, user.age, "NA")
          }
      )

    // 查看更新之后的顶点
    println("更新之后的顶点:")
    socialGraphWithCity.vertices.collect.foreach(println)
  }

输出结果如下:

更新之后的顶点:
(8,UserWithCity(Harry,36,NA))
(1,UserWithCity(Alex,26,Boston))
(9,UserWithCity(Ivan,28,Tokyo))
(10,UserWithCity(Jill,48,Palo Alto))
(2,UserWithCity(Bill,42,NA))
(11,UserWithCity(NA,0,NA))
(3,UserWithCity(Carol,18,New York))
(4,UserWithCity(Dave,16,NA))
(5,UserWithCity(Eve,45,London))
(6,UserWithCity(Farell,30,NA))
(7,UserWithCity(Garry,32,Bombay))

4.聚合操作

GraphX中提供的聚合操作有三个,分别是aggregateMessages()、collectNeighborIds()和collectNeighbors()。

在org.apache.spark.graphx.Graph类中提供了aggregateMessages方法,用来从相邻顶点和连接边聚合每个顶点的值。它返回顶点id和聚合消息的pair RDD。这个方法使用两个用户定义的函数来进行聚合,并为属性图中的每个三元组调用这些函数。

通过在边级别上表示计算,我们实现了最大的并行性。这是图API中支持邻域级计算的核心函数之一。它有以下方法签名:

def aggregateMessages[A: ClassTag](
     sendMsg: EdgeContext[VD, ED, A] => Unit,
     mergeMsg: (A, A) => A,
     tripletFields: TripletFields = TripletFields.All): VertexRDD[A]

它的主要功能是向邻边发消息,合并邻边收到的消息,返回messageRDD。该接口有三个参数,分别为发消息函数、合并消息函数以及tripletFields属性。

其中sendMsg函数为图中的每条边接收一个EdgeContext对象,如果需要,则使用该EdgeContext向顶点发送消息。EdgeContext对象包含源和目标顶点的ID和属性对象、边的属性对象、以及向邻近顶点发送消息的两种方法:sendToSrc和sendToDst。这个sendMsg函数可以使用EdgeContext来决定发送给每个顶点的消息。

而mergeMsg函数聚合去向同一顶点的消息。该函数用于在每个edge分区中每个顶点收到的消息合并,并且它还用于合并不同分区vertexId相同的消息。

最后,tripletFields参数指定应该提供哪些字段作为EdgeContext的一部分。可能的值是TripletFields类中的静态字段(None,EdgeOnly,Src,Dst,和All),默认是TripletFields.ALL。

方法aggregateMessages 的使用分两步:

    >
  • 在第一步(方法的第一个参数)中,消息被发送到目标顶点或源顶点(类似于MapReduce中的Map函数)
  • 在第二步(方法的第二个参数)中,聚合是在这些消息上完成的(类似于MapReduce中的Reduce函数)。

在下面的示例中,我们计算社交图中每个用户的关注者中最年长的追随者的年龄(即粉丝的最大年龄):

import org.apache.spark.graphx._
import org.apache.spark.sql.SparkSession
......

  // case class,用来代表顶点属性,存储一个用户的信息
  case class User(name: String, age: Int)

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("graphx demo")
      .getOrCreate()

    // 构造顶点RDDs
    val users = List(
      (1L, User("Alex", 26)),
      (2L, User("Bill", 42)),
      (3L, User("Carol", 18)),
      (4L, User("Dave", 16)),
      (5L, User("Eve", 45)),
      (6L, User("Farell", 30)),
      (7L, User("Garry", 32)),
      (8L, User("Harry", 36)),
      (9L, User("Ivan", 28)),
      (10L, User("Jill", 48))
    )
    val usersRDD = spark.sparkContext.parallelize(users)

    // 构造边RDDs
    val follows = List(
      Edge(1L, 2L, 1),
      Edge(2L, 3L, 1),
      Edge(3L, 1L, 1),
      Edge(3L, 4L, 1),
      Edge(3L, 5L, 1),
      Edge(4L, 5L, 1),
      Edge(6L, 5L, 1),
      Edge(7L, 6L, 1),
      Edge(6L, 8L, 1),
      Edge(7L, 8L, 1),
      Edge(7L, 9L, 1),
      Edge(9L, 8L, 1),
      Edge(8L, 10L, 1),
      Edge(10L, 9L, 1),
      Edge(1L, 11L, 1)
    )
    val followsRDD = spark.sparkContext.parallelize(follows)

    // 创建默认属性集(id为11的顶点没有任何属性)
    // 它会将默认属性分配给没有被显式分配任何属性的顶点
    val defaultUser = User("NA", 0)

    // 构造图对象
    val socialGraph = Graph(usersRDD, followsRDD, defaultUser)

    // 如何找到每个用户最年长的追随者的年龄
    val oldestFollower = socialGraph.aggregateMessages[Int](
      sendMsg = ec => ec.sendToDst(ec.srcAttr.age),     	// 向邻近顶点发送消息
      mergeMsg = (x, y) => math.max(x,y),               	// 合并发送到本顶点的消息
      tripletFields = TripletFields.All                 	// 可省略(使用默认值)
    )
    oldestFollower.collect.foreach(println)
  }

输出结果如下:

(8,32)
(1,18)
(9,48)
(10,36)
(2,26)
(11,26)
(3,42)
(4,18)
(5,30)
(6,32)

《PySpark原理深入与编程实战》