GraphX案例:分析家庭成员关系

现有两个文件,vertexs.csv和edges.csv,分别存储一个家庭的成员及他们的关系。下面使用Spark GraphX分析该家庭成员的关系。

实现代码如下:

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.SparkSession
......

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("graphx demo")
      .getOrCreate()

    val vertexFile = "src/main/data/family/vertexs.csv"
    val edgeFile = "src/main/data/family/edges.csv"

    // 构造顶点
    val vertices = spark.sparkContext
      .textFile(vertexFile)
      .map { line =>
        val fields = line.split(",")
        ( fields(0).toLong, (fields(1), fields(2)) )
      }

    // 构造边
    val edges = spark.sparkContext
      .textFile(edgeFile)
      .map { line =>
        val fields = line.split(",")
        Edge(fields(0).toLong, fields(1).toLong, fields(2))
      }

    // 默认缺失的顶点
    val default = ("Unknown", "Missing")

    // 构造图
    val graph = Graph(vertices, edges, default)

    // 查看图的顶点和边
    println( "顶点数量:" + graph.vertices.count )
    println( "边的数量:" + graph.edges.count )

    // 统计年龄小于40岁的家庭成员数
    val c1 = graph.vertices
      .filter {
        case (id, (name, age)) => age.toLong > 40
      }
      .count
    println("\n年龄小于40岁的家庭成员数:" + c1)

    // 统计父子或母子关系的数量
    val c2 = graph.edges
      .filter {
        case Edge(from, to, property)  => property == "Father" | property == "Mother"
      }
      .count
    println("\n父子或母子关系的数量:" + c2)

    // println( "Vertices count : " + c1 )
    // println( "Edges count : " + c2 )

    // 家庭成员影响力统计(使用page rank算法)
    val tolerance = 0.0001
    val ranking = graph.pageRank(tolerance).vertices
    val rankByPerson = vertices.join(ranking).map {
      case (id, ((person, age), rank)) => (rank, id, person)
    }

    println("\n家庭影响力:")
    rankByPerson.collect().foreach {
      case (rank, id, person) => println ( f"Rank $rank%1.2f id $id person $person")
    }

    /*
    三角形计数算法提供了一个基于顶点的三角形数量计数,与这个顶点相关联。
    这对于路由查找非常有用,因为在路由规划中需要生成最小的、无三角的生成树图。
    */
    val tCount = graph.triangleCount().vertices
    println("\n三角计数" + tCount.collect().mkString("\n"))

    /*
    当从数据创建一个大图时,它可能包含未连接的子图,即相互隔离的子图,并且它们之间不包含桥接或连接边。
    连通组件算法和强连通组件算法提供了这种连接性的度量。
    */
    val iterations = 1000
    val connected = graph.connectedComponents().vertices
    val connectedS = graph.stronglyConnectedComponents(iterations).vertices

    // 然后将顶点计数与原始的顶点记录连接起来,这样连接计数就可以与顶点信息相关联,例如此人的姓名
    val connByPerson = vertices.join(connected).map {
      case (id, ((person,age), conn)) => (conn, id, person)
    }
    println("\n连通组件:")
    connByPerson.collect().foreach {
      case (conn, id, person) => println ( f"Weak $conn $id $person" )
    }

    val connByPersonS = vertices.join(connectedS).map {
      case (id, ((person,age), conn )) => (conn, id, person)
    }
    println("\n强连通组件:")
    connByPersonS.collect().foreach {
      case (conn, id, person) => println ( f"Strong $conn $id $person" )
    }
  }

执行以上代码,输出内容如下:

顶点数量:6
边的数量:12

年龄小于40岁的家庭成员数:4

父子或母子关系的数量:4

家庭影响力:
Rank 0.15 id 4 person Jim
Rank 0.15 id 6 person Flo
Rank 1.62 id 2 person Sarah
Rank 1.82 id 1 person Mike
Rank 1.13 id 3 person John
Rank 1.13 id 5 person Kate

三角计数(4,0)
(6,0)
(2,2)
(1,2)
(3,1)
(5,1)

连通组件:
Weak 1 4 Jim
Weak 1 6 Flo
Weak 1 2 Sarah
Weak 1 1 Mike
Weak 1 3 John
Weak 1 5 Kate

强连通组件:
Strong 4 4 Jim
Strong 6 6 Flo
Strong 1 2 Sarah
Strong 1 1 Mike
Strong 1 3 John
Strong 1 5 Kate

《Flink原理深入与编程实战》