GraphX内置图算法1:预处理数据集

GraphX为一些常见的图形算法提供了内置实现。这些算法可以作为对Graph类的方法调用。使用图算法可以很好地解决许多问题。

在本节中,我们将介绍几个Spark GraphX所实现了的图算法:

  • 最短路径算法—找到一组顶点的最短路径。
  • Page Rank算法—根据进出的边数来计算图中顶点的相对重要性。
  • 连通组件算法—找到不同的子图,如果它们存在于图中的话。
  • 强连通的组件算法—找到双连通顶点的集群。

预处理数据集

在本节中,将使用的数据集可以从斯坦福大学获得。这个数据集包含了维基百科文章的一个子集,对于本节,我们只需要其中的两个:articles.tsv和links.tsv。其中articles.tsv包含唯一的文章名称(每行一个),links.tsv包含与源和目标条目名称的链接,由制表符分隔。现要求用户将两篇文章用尽可能少的链接连接到一起。

首先对该数据集进行预处理,以构建图模型。在下面的代码中,先加载内容是文章名称的articles.tsv文件,删除空行和注释行,然后使用zipWithIndex,为每个文章名称分配一个惟一的编号(ID):

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("graphx demo")
      .getOrCreate()

    // 创建以文章名称为顶点的RDD
    // 删除空行和注释行,然后使用zipWithIndex,为每个文章名称分配ID
    val articlesFile = "src/main/data/wikispeedia/articles.tsv"
    val articles = spark.sparkContext
      .textFile(articlesFile)
      .filter(line => line.trim() != "" && !line.startsWith("#"))
      .zipWithIndex()
      .cache()
    articles.take(3).foreach(println)
}

输出内容如下:

(%C3%81ed%C3%A1n_mac_Gabr%C3%A1in,0)
(%C3%85land,1)
(%C3%89douard_Manet,2)

类似地,加载内容是文章链接的links.tsv,代码如下:

// 加载内容是文章链接的links.tsv,并删除空行和注释行
val linksFile = "src/main/data/wikispeedia/links.tsv"
val links = spark.sparkContext
      .textFile(linksFile)
      .filter(line => line.trim() != "" && !line.startsWith("#"))
links.take(3).foreach(println)

输出内容如下:

%C3%81ed%C3%A1n_mac_Gabr%C3%A1in	Bede
%C3%81ed%C3%A1n_mac_Gabr%C3%A1in	Columba
%C3%81ed%C3%A1n_mac_Gabr%C3%A1in	D%C3%A1l_Riata

然后,解析每个链接行以获得两个链接的文章名称,然后通过join连接articles RDD中的文章名称,使用文章ID来替换每个名称:

val linkIndexes = links
      .map(x => {
        val alink = x.split("\t")
        (alink(0),alink(1))
      })
      .join(articles)                   // (文章名称,(链接文章名称,文章索引))
      .map(x => x._2)                   // (链接文章名称,文章索引)
      .join(articles)                   // (链接文章名称,(文章索引,链接文章索引))
      .map(x => x._2)                   // (文章索引,链接文章索引)

linkIndexes.take(3).foreach(println)

由此产生的RDD包含元组(元组包括源和目标的文章索引),输出内容如下:

(1889,1794)
(241,1794)
(2789,1794)

接下来,我们就可以用它来构建一个Graph对象模型。这里,我们使用了fromEdgeTuples方法,它的作用是从边集合(被编码为顶点ID对)中构造一个图,该图带有边属性(其中包含重复边的计数或1(如果uniqueEdges为None)),以及顶点属性(包含每个顶点的总度数)。代码如下:

// 从被编码为顶点ID对的边集合中构造一个图
val wikigraph = Graph.fromEdgeTuples(linkIndexes, 0);    // 0为顶点属性

// 查看顶点和边属性
val triplets = wikigraph.triplets

// 输出源顶点ID、源顶点属性name、目标顶点ID和目标顶点属性name
triplets.take(3).foreach{ t =>
   println(t.srcId + "," + t.srcAttr + " ---> " + t.dstId + "," + t.dstAttr + ": " + t.attr)
}

输出内容如下:

0,0 ---> 530,0: 1
0,0 ---> 1115,0: 1
0,0 ---> 2173,0: 1

查看文章的数量和图中顶点的数量:

println("文章的数量:" + articles.count())
println("图中顶点的数量:" + wikigraph.vertices.count())

输出结果如下:

文章的数量:4604
图中顶点的数量:4592

可以看到在图中,文章和顶点的数量有细微的差别。这是因为链接文件中缺少一些文章。这可以通过在linkIndexes RDD中统计所有唯一文章名称来检查:

val uniqueArticles = linkIndexes.map(x => x._1)
      .union(linkIndexes.map(x => x._2))
      .distinct()
      .count()
println(uniqueArticles)

输出内容如下:

4592

《Flink原理深入与编程实战》