PySpark RDD编程案例_合并小文件与实现二次排序
本节介绍另外两个小案例:
【示例】合并小文件
使用SparkContext的wholeTextFiles方法和colleasc方法,可以实现对小文件的合并。
例如,在/home/hduser/data/spark/files目录下有多个小文件(示例中使用的是本地文件系统路径,可改为HDFS文件系统路径),我们使用下面的代码将该目录下的多个小文件合并为一个文件。
from pyspark.sql import SparkSession # 构建SparkSession和SparkContext实例 spark = SparkSession.builder \ .master("spark://xueai8:7077") \ .appName("pyspark demo") \ .getOrCreate() # 加载数据源,构造RDD textFiles = spark.sparkContext.wholeTextFiles("file:///home/hduser/data/spark/files") textFiles.map(lambda line: line[1]).coalesce(1).saveAsTextFile("file:///home/hduser/data/spark/files-one")
【示例】使用Spark RDD实现二次排序
什么是二次排序?二次排序就是对于(key,value)类型的数据,不但按key排序,而且每个Key对应的value也是有序的。
假设我们有以下输入文件data.txt,其中逗号分割的分别是年、月和总数:
2018,5,22 2019,1,24 2018,2,128 2019,3,56 2019,1,3 2019,2,-43 2019,4,5 2019,3,46 2018,2,64 2019,1,4 2019,1,21 2019,2,35 2019,2,0
我们想要对这些数据排序,期望的输出结果如下:
2018-2 64,128 2018-5 22 2019-1 3,4,21,24 2019-2 -43,0,35 2019-3 46,56 2019-4 5
Spark 二次排序解决方案如下:需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。
# 加载数据集 inputPath = "file:///home/hduser/data/spark/data.txt" inputRDD = sc.textFile(inputPath) # 定义map函数 def customMapFun(line): arr = line.split(",") key = arr[0] + "-" + arr[1] value = arr[2] return (key, int(value)) # 实现二次排序 sortedRDD = inputRDD.map(customMapFun).groupByKey().map(lambda t: (t[0],sorted(list(t[1])))).sortByKey() sortedRDD.collect() # 结果输出 for t in sortedRDD.collect(): print(t)
执行以上代码,输出结果如下所示:
('2018-2', [64, 128]) ('2018-5', [22]) ('2019-1', [3, 4, 21, 24]) ('2019-2', [-43, 0, 35]) ('2019-3', [46, 56]) ('2019-4', [5])