PySpark RDD编程案例_合并小文件与实现二次排序
本节介绍另外两个小案例:
【示例】合并小文件
使用SparkContext的wholeTextFiles方法和colleasc方法,可以实现对小文件的合并。
例如,在/home/hduser/data/spark/files目录下有多个小文件(示例中使用的是本地文件系统路径,可改为HDFS文件系统路径),我们使用下面的代码将该目录下的多个小文件合并为一个文件。
from pyspark.sql import SparkSession
# 构建SparkSession和SparkContext实例
spark = SparkSession.builder \
.master("spark://xueai8:7077") \
.appName("pyspark demo") \
.getOrCreate()
# 加载数据源,构造RDD
textFiles = spark.sparkContext.wholeTextFiles("file:///home/hduser/data/spark/files")
textFiles.map(lambda line: line[1]).coalesce(1).saveAsTextFile("file:///home/hduser/data/spark/files-one")
【示例】使用Spark RDD实现二次排序
什么是二次排序?二次排序就是对于(key,value)类型的数据,不但按key排序,而且每个Key对应的value也是有序的。
假设我们有以下输入文件data.txt,其中逗号分割的分别是年、月和总数:
2018,5,22 2019,1,24 2018,2,128 2019,3,56 2019,1,3 2019,2,-43 2019,4,5 2019,3,46 2018,2,64 2019,1,4 2019,1,21 2019,2,35 2019,2,0
我们想要对这些数据排序,期望的输出结果如下:
2018-2 64,128 2018-5 22 2019-1 3,4,21,24 2019-2 -43,0,35 2019-3 46,56 2019-4 5
Spark 二次排序解决方案如下:需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。
# 加载数据集
inputPath = "file:///home/hduser/data/spark/data.txt"
inputRDD = sc.textFile(inputPath)
# 定义map函数
def customMapFun(line):
arr = line.split(",")
key = arr[0] + "-" + arr[1]
value = arr[2]
return (key, int(value))
# 实现二次排序
sortedRDD = inputRDD.map(customMapFun).groupByKey().map(lambda t: (t[0],sorted(list(t[1])))).sortByKey()
sortedRDD.collect()
# 结果输出
for t in sortedRDD.collect():
print(t)
执行以上代码,输出结果如下所示:
('2018-2', [64, 128])
('2018-5', [22])
('2019-1', [3, 4, 21, 24])
('2019-2', [-43, 0, 35])
('2019-3', [46, 56])
('2019-4', [5])