Spark RDD编程案例

Top N 问题

【示例】给出一个员工信息名单,找出收入最高的前10名员工(Top N问题)。

样本数据 employees.csv内容如下:

ename,title,department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate
张三,paramedic i/c,fire,f,salary,,91080.00,
李四,lieutenant,fire,f,salary,,114846.00,
王老五,sergeant,police,f,salary,,104628.00,
赵六,police officer,police,f,salary,,96060.00,
钱七,clerk iii,police,f,salary,,53076.00,
周扒皮,firefighter,fire,f,salary,,87006.00,
吴用,law clerk,law,f,hourly,35,,14.51

实现代码如下。

// RDD实现
val inputPath = "file:///home/hduser/data/spark_demo/employees.csv"
val rdd = sc.textFile(inputPath);

// def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)
val sortedData = rdd.map(_.split(","))
                    .sortBy(t => if(t(6).length>0) t(6).toFloat else 0.0, false)

val top = sortedData.take(10)

top.foreach(emp => println(emp.toList.mkString(",")))

执行以上代码,得到如下的结果:

李四,lieutenant,fire,f,salary,,114846.00
王老五,sergeant,police,f,salary,,104628.00
赵六,police officer,police,f,salary,,96060.00
张三,paramedic i/c,fire,f,salary,,91080.00
周扒皮,firefighter,fire,f,salary,,87006.00
钱七,clerk iii,police,f,salary,,53076.00
吴用,law clerk,law,f,hourly,35,,14.51

合并小文件

【例】实现对HDFS上小文件的合并。

使用SparkContext的wholeTextFiles方法和colleasc方法,可以实现对小文件的合并。

import org.apache.spark.sql.SparkSession

/**
  *
  * 加载整个目录中的文件,使用wholeTextFiles
  */
object WordCount3 {
  def main(args: Array[String]): Unit = {

    // 创建SparkSession实例 - 入口
    val spark = SparkSession.builder.master("local[*]").appName("HelloWorld").getOrCreate

    // 加载数据源,构造RDD
    val textFiles = spark.sparkContext.wholeTextFiles("input/files")

    textFiles.map(_._2).coalesce(1).saveAsTextFile("output/one")
  }
}

二次排序

【示例】使用Spark RDD实现二次排序。

什么是二次排序?二次排序就是对于类型的数据,不但按key排序,而且每个Key对应的value也是有序的。

假设我们有以下输入文件data.txt,其中逗号分割的分别是年、月和总数:

    2018,5,22
    2019,1,24
    2018,2,128
    2019,3,56
    2019,1,3
    2019,2,-43
    2019,4,5
    2019,3,46
    2018,2,64
    2019,1,4
    2019,1,21
    2019,2,35
    2019,2,0

我们想要对这些数据排序,期望的输出结果如下:

    2018-2  64,128
    2018-5  22
    2019-1  3,4,21,24
    2019-2  -43,0,35
    2019-3  46,56
    2019-4  5

Spark 二次排序解决方案如下:需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。

// 加载数据集
val inputPath = "file:///home/hduser/data/spark/data.txt"
val inputRDD = sc.textFile(inputPath)

// 实现二次排序
val sortedRDD = inputRDD
                  .map(line => {
                    val arr = line.split(",")
                    val key = arr(0) + "-" + arr(1)
                    val value = arr(2)
                    (key,value)
                  })
                  .groupByKey()
                  .map(t => (t._1, t._2.toList.sortWith(_.toInt < _.toInt).mkString(",")))
                  .sortByKey(true)          // true:升序,false:降序

// 结果输出
sortedRDD.collect.foreach(t => println(t._1 + "\t" + t._2))

《Flink原理深入与编程实战》