Spark RDD编程案例
Top N 问题
【示例】给出一个员工信息名单,找出收入最高的前10名员工(Top N问题)。
样本数据 employees.csv内容如下:
ename,title,department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate 张三,paramedic i/c,fire,f,salary,,91080.00, 李四,lieutenant,fire,f,salary,,114846.00, 王老五,sergeant,police,f,salary,,104628.00, 赵六,police officer,police,f,salary,,96060.00, 钱七,clerk iii,police,f,salary,,53076.00, 周扒皮,firefighter,fire,f,salary,,87006.00, 吴用,law clerk,law,f,hourly,35,,14.51
实现代码如下。
// RDD实现 val inputPath = "file:///home/hduser/data/spark_demo/employees.csv" val rdd = sc.textFile(inputPath); // def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) val sortedData = rdd.map(_.split(",")) .sortBy(t => if(t(6).length>0) t(6).toFloat else 0.0, false) val top = sortedData.take(10) top.foreach(emp => println(emp.toList.mkString(",")))
执行以上代码,得到如下的结果:
李四,lieutenant,fire,f,salary,,114846.00 王老五,sergeant,police,f,salary,,104628.00 赵六,police officer,police,f,salary,,96060.00 张三,paramedic i/c,fire,f,salary,,91080.00 周扒皮,firefighter,fire,f,salary,,87006.00 钱七,clerk iii,police,f,salary,,53076.00 吴用,law clerk,law,f,hourly,35,,14.51
合并小文件
【例】实现对HDFS上小文件的合并。
使用SparkContext的wholeTextFiles方法和colleasc方法,可以实现对小文件的合并。
import org.apache.spark.sql.SparkSession /** * * 加载整个目录中的文件,使用wholeTextFiles */ object WordCount3 { def main(args: Array[String]): Unit = { // 创建SparkSession实例 - 入口 val spark = SparkSession.builder.master("local[*]").appName("HelloWorld").getOrCreate // 加载数据源,构造RDD val textFiles = spark.sparkContext.wholeTextFiles("input/files") textFiles.map(_._2).coalesce(1).saveAsTextFile("output/one") } }
二次排序
【示例】使用Spark RDD实现二次排序。
什么是二次排序?二次排序就是对于
假设我们有以下输入文件data.txt,其中逗号分割的分别是年、月和总数:
2018,5,22 2019,1,24 2018,2,128 2019,3,56 2019,1,3 2019,2,-43 2019,4,5 2019,3,46 2018,2,64 2019,1,4 2019,1,21 2019,2,35 2019,2,0
我们想要对这些数据排序,期望的输出结果如下:
2018-2 64,128 2018-5 22 2019-1 3,4,21,24 2019-2 -43,0,35 2019-3 46,56 2019-4 5
Spark 二次排序解决方案如下:需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。
// 加载数据集 val inputPath = "file:///home/hduser/data/spark/data.txt" val inputRDD = sc.textFile(inputPath) // 实现二次排序 val sortedRDD = inputRDD .map(line => { val arr = line.split(",") val key = arr(0) + "-" + arr(1) val value = arr(2) (key,value) }) .groupByKey() .map(t => (t._1, t._2.toList.sortWith(_.toInt < _.toInt).mkString(","))) .sortByKey(true) // true:升序,false:降序 // 结果输出 sortedRDD.collect.foreach(t => println(t._1 + "\t" + t._2))