GraphX案例:分析家庭成员关系
现有两个文件,vertexs.csv和edges.csv,分别存储一个家庭的成员及他们的关系。下面使用Spark GraphX分析该家庭成员的关系。
实现代码如下:
import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.sql.SparkSession ...... def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[*]") .appName("graphx demo") .getOrCreate() val vertexFile = "src/main/data/family/vertexs.csv" val edgeFile = "src/main/data/family/edges.csv" // 构造顶点 val vertices = spark.sparkContext .textFile(vertexFile) .map { line => val fields = line.split(",") ( fields(0).toLong, (fields(1), fields(2)) ) } // 构造边 val edges = spark.sparkContext .textFile(edgeFile) .map { line => val fields = line.split(",") Edge(fields(0).toLong, fields(1).toLong, fields(2)) } // 默认缺失的顶点 val default = ("Unknown", "Missing") // 构造图 val graph = Graph(vertices, edges, default) // 查看图的顶点和边 println( "顶点数量:" + graph.vertices.count ) println( "边的数量:" + graph.edges.count ) // 统计年龄小于40岁的家庭成员数 val c1 = graph.vertices .filter { case (id, (name, age)) => age.toLong > 40 } .count println("\n年龄小于40岁的家庭成员数:" + c1) // 统计父子或母子关系的数量 val c2 = graph.edges .filter { case Edge(from, to, property) => property == "Father" | property == "Mother" } .count println("\n父子或母子关系的数量:" + c2) // println( "Vertices count : " + c1 ) // println( "Edges count : " + c2 ) // 家庭成员影响力统计(使用page rank算法) val tolerance = 0.0001 val ranking = graph.pageRank(tolerance).vertices val rankByPerson = vertices.join(ranking).map { case (id, ((person, age), rank)) => (rank, id, person) } println("\n家庭影响力:") rankByPerson.collect().foreach { case (rank, id, person) => println ( f"Rank $rank%1.2f id $id person $person") } /* 三角形计数算法提供了一个基于顶点的三角形数量计数,与这个顶点相关联。 这对于路由查找非常有用,因为在路由规划中需要生成最小的、无三角的生成树图。 */ val tCount = graph.triangleCount().vertices println("\n三角计数" + tCount.collect().mkString("\n")) /* 当从数据创建一个大图时,它可能包含未连接的子图,即相互隔离的子图,并且它们之间不包含桥接或连接边。 连通组件算法和强连通组件算法提供了这种连接性的度量。 */ val iterations = 1000 val connected = graph.connectedComponents().vertices val connectedS = graph.stronglyConnectedComponents(iterations).vertices // 然后将顶点计数与原始的顶点记录连接起来,这样连接计数就可以与顶点信息相关联,例如此人的姓名 val connByPerson = vertices.join(connected).map { case (id, ((person,age), conn)) => (conn, id, person) } println("\n连通组件:") connByPerson.collect().foreach { case (conn, id, person) => println ( f"Weak $conn $id $person" ) } val connByPersonS = vertices.join(connectedS).map { case (id, ((person,age), conn )) => (conn, id, person) } println("\n强连通组件:") connByPersonS.collect().foreach { case (conn, id, person) => println ( f"Strong $conn $id $person" ) } }
执行以上代码,输出内容如下:
顶点数量:6 边的数量:12 年龄小于40岁的家庭成员数:4 父子或母子关系的数量:4 家庭影响力: Rank 0.15 id 4 person Jim Rank 0.15 id 6 person Flo Rank 1.62 id 2 person Sarah Rank 1.82 id 1 person Mike Rank 1.13 id 3 person John Rank 1.13 id 5 person Kate 三角计数(4,0) (6,0) (2,2) (1,2) (3,1) (5,1) 连通组件: Weak 1 4 Jim Weak 1 6 Flo Weak 1 2 Sarah Weak 1 1 Mike Weak 1 3 John Weak 1 5 Kate 强连通组件: Strong 4 4 Jim Strong 6 6 Flo Strong 1 2 Sarah Strong 1 1 Mike Strong 1 3 John Strong 1 5 Kate