GraphX内置图算法1:预处理数据集
GraphX为一些常见的图形算法提供了内置实现。这些算法可以作为对Graph类的方法调用。使用图算法可以很好地解决许多问题。
在本节中,我们将介绍几个Spark GraphX所实现了的图算法:
- 最短路径算法—找到一组顶点的最短路径。
- Page Rank算法—根据进出的边数来计算图中顶点的相对重要性。
- 连通组件算法—找到不同的子图,如果它们存在于图中的话。
- 强连通的组件算法—找到双连通顶点的集群。
预处理数据集
在本节中,将使用的数据集可以从斯坦福大学获得。这个数据集包含了维基百科文章的一个子集,对于本节,我们只需要其中的两个:articles.tsv和links.tsv。其中articles.tsv包含唯一的文章名称(每行一个),links.tsv包含与源和目标条目名称的链接,由制表符分隔。现要求用户将两篇文章用尽可能少的链接连接到一起。
首先对该数据集进行预处理,以构建图模型。在下面的代码中,先加载内容是文章名称的articles.tsv文件,删除空行和注释行,然后使用zipWithIndex,为每个文章名称分配一个惟一的编号(ID):
def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[*]") .appName("graphx demo") .getOrCreate() // 创建以文章名称为顶点的RDD // 删除空行和注释行,然后使用zipWithIndex,为每个文章名称分配ID val articlesFile = "src/main/data/wikispeedia/articles.tsv" val articles = spark.sparkContext .textFile(articlesFile) .filter(line => line.trim() != "" && !line.startsWith("#")) .zipWithIndex() .cache() articles.take(3).foreach(println) }
输出内容如下:
(%C3%81ed%C3%A1n_mac_Gabr%C3%A1in,0) (%C3%85land,1) (%C3%89douard_Manet,2)
类似地,加载内容是文章链接的links.tsv,代码如下:
// 加载内容是文章链接的links.tsv,并删除空行和注释行 val linksFile = "src/main/data/wikispeedia/links.tsv" val links = spark.sparkContext .textFile(linksFile) .filter(line => line.trim() != "" && !line.startsWith("#")) links.take(3).foreach(println)
输出内容如下:
%C3%81ed%C3%A1n_mac_Gabr%C3%A1in Bede %C3%81ed%C3%A1n_mac_Gabr%C3%A1in Columba %C3%81ed%C3%A1n_mac_Gabr%C3%A1in D%C3%A1l_Riata
然后,解析每个链接行以获得两个链接的文章名称,然后通过join连接articles RDD中的文章名称,使用文章ID来替换每个名称:
val linkIndexes = links .map(x => { val alink = x.split("\t") (alink(0),alink(1)) }) .join(articles) // (文章名称,(链接文章名称,文章索引)) .map(x => x._2) // (链接文章名称,文章索引) .join(articles) // (链接文章名称,(文章索引,链接文章索引)) .map(x => x._2) // (文章索引,链接文章索引) linkIndexes.take(3).foreach(println)
由此产生的RDD包含元组(元组包括源和目标的文章索引),输出内容如下:
(1889,1794) (241,1794) (2789,1794)
接下来,我们就可以用它来构建一个Graph对象模型。这里,我们使用了fromEdgeTuples方法,它的作用是从边集合(被编码为顶点ID对)中构造一个图,该图带有边属性(其中包含重复边的计数或1(如果uniqueEdges为None)),以及顶点属性(包含每个顶点的总度数)。代码如下:
// 从被编码为顶点ID对的边集合中构造一个图 val wikigraph = Graph.fromEdgeTuples(linkIndexes, 0); // 0为顶点属性 // 查看顶点和边属性 val triplets = wikigraph.triplets // 输出源顶点ID、源顶点属性name、目标顶点ID和目标顶点属性name triplets.take(3).foreach{ t => println(t.srcId + "," + t.srcAttr + " ---> " + t.dstId + "," + t.dstAttr + ": " + t.attr) }
输出内容如下:
0,0 ---> 530,0: 1 0,0 ---> 1115,0: 1 0,0 ---> 2173,0: 1
查看文章的数量和图中顶点的数量:
println("文章的数量:" + articles.count()) println("图中顶点的数量:" + wikigraph.vertices.count())
输出结果如下:
文章的数量:4604 图中顶点的数量:4592
可以看到在图中,文章和顶点的数量有细微的差别。这是因为链接文件中缺少一些文章。这可以通过在linkIndexes RDD中统计所有唯一文章名称来检查:
val uniqueArticles = linkIndexes.map(x => x._1) .union(linkIndexes.map(x => x._2)) .distinct() .count() println(uniqueArticles)
输出内容如下:
4592