使用GraphX分析航班数据
下面是一个简单的示例,我们将分析三个航班。每次航班,我们都有以下信息:
在这个场景中,我们将机场表示为顶点,而航线表示为边。对于我们的图,我们将有三个顶点,每个顶点代表一个机场。机场之间的距离为航线属性,如下图所示:
其中代表机场的顶点表如下:
代表航线的边表如下:
首先,我们将导入GraphX包:
import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.sql.SparkSession
然后构建图来表示机场及航线数据。代码如下:
def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[*]") .appName("graphx demo") .getOrCreate() // 我们将机场定义为顶点 // 每个顶点由以下内容组成(机场ID,机场名称) val vertices = Array((1L, "SFO"),(2L, "ORD"),(3L,"DFW")) val vRDD = spark.sparkContext.parallelize(vertices) // 定义一个默认的顶点,叫做nowhere val nowhere = "nowhere" // 边是机场之间的航线。边必须具有源、目标和属性。 // 在这里,边包括: (出港机场id,入港机场id, 航线距离) val edges = Array(Edge(1L,2L,1800),Edge(2L,3L,800),Edge(3L,1L,1400)) val eRDD = spark.sparkContext.parallelize(edges) // 下面创建属性图 val graph = Graph(vRDD, eRDD, nowhere) // 图的顶点(所有机场) println("所有机场:") graph.vertices.collect.foreach(println) // 图的边(所有航线) println("\n所有航线:") graph.edges.collect.foreach(println) }
执行以上代码,输出内容如下:
所有机场: (1,SFO) (2,ORD) (3,DFW) 所有航线: Edge(1,2,1800) Edge(2,3,800) Edge(3,1,1400)
接下来,我们需要回答以下问题:
- 1)有多少个机场?
- 2)有多少条航线?
- 3)哪些航线的距离 > 1000英里?
- 4)显示所有机场和航线的信息。
- 5)排序并输出最长距离的航线。
代码如下所示:
// 1. 有多少个机场? println("有多少个机场? " + graph.numVertices) // 2. 有多少条航线? println("有多少条航线? " + graph.numEdges) // 3. 哪些航线的距离 > 1000英里? println("\n哪些航线的距离超过了1000英里?") graph.edges .filter { case Edge(src, dst, dis) => dis > 1000 } .collect .foreach(println) // 4. 显示所有机场和航线的信息 println("\n显示所有机场和航线的信息:") graph.triplets.collect.foreach(println) // 5. 排序并输出最长距离的航线 println("----------------------------------") graph.triplets .sortBy(_.attr, ascending=false) .map(triplet => "距离: " + triplet.attr + "," + triplet.srcAttr + "-" + triplet.dstAttr + ".") .collect .foreach(println)
执行以上代码,输出内容如下:
有多少个机场? 3 有多少条航线? 3 哪些航线的距离超过了1000英里? Edge(1,2,1800) Edge(3,1,1400) 显示所有机场和航线的信息: ((1,SFO),(2,ORD),1800) ((2,ORD),(3,DFW),800) ((3,DFW),(1,SFO),1400) ---------------------------------- 距离: 1800,SFO-ORD. 距离: 1400,DFW-SFO. 距离: 800,ORD-DFW.