PySpark RDD编程案例_合并小文件与实现二次排序

本节介绍另外两个小案例:

【示例】合并小文件

使用SparkContext的wholeTextFiles方法和colleasc方法,可以实现对小文件的合并。

例如,在/home/hduser/data/spark/files目录下有多个小文件(示例中使用的是本地文件系统路径,可改为HDFS文件系统路径),我们使用下面的代码将该目录下的多个小文件合并为一个文件。

from pyspark.sql import SparkSession

# 构建SparkSession和SparkContext实例
spark = SparkSession.builder \
   .master("spark://xueai8:7077") \
   .appName("pyspark demo") \
   .getOrCreate()

# 加载数据源,构造RDD
textFiles = spark.sparkContext.wholeTextFiles("file:///home/hduser/data/spark/files")

textFiles.map(lambda line: line[1]).coalesce(1).saveAsTextFile("file:///home/hduser/data/spark/files-one")

【示例】使用Spark RDD实现二次排序

什么是二次排序?二次排序就是对于(key,value)类型的数据,不但按key排序,而且每个Key对应的value也是有序的。

假设我们有以下输入文件data.txt,其中逗号分割的分别是年、月和总数:

2018,5,22
2019,1,24
2018,2,128
2019,3,56
2019,1,3
2019,2,-43
2019,4,5
2019,3,46
2018,2,64
2019,1,4
2019,1,21
2019,2,35
2019,2,0

我们想要对这些数据排序,期望的输出结果如下:

2018-2  64,128
2018-5  22
2019-1  3,4,21,24
2019-2  -43,0,35
2019-3  46,56
2019-4  5

Spark 二次排序解决方案如下:需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。

# 加载数据集
inputPath = "file:///home/hduser/data/spark/data.txt"
inputRDD = sc.textFile(inputPath)

# 定义map函数
def customMapFun(line):
    arr = line.split(",")
    key = arr[0] + "-" + arr[1]
    value = arr[2]
    return (key, int(value))

# 实现二次排序
sortedRDD = inputRDD.map(customMapFun).groupByKey().map(lambda t: (t[0],sorted(list(t[1])))).sortByKey()
sortedRDD.collect()

# 结果输出
for t in sortedRDD.collect():
    print(t)

执行以上代码,输出结果如下所示:

('2018-2', [64, 128])
('2018-5', [22])
('2019-1', [3, 4, 21, 24])
('2019-2', [-43, 0, 35])
('2019-3', [46, 56])
('2019-4', [5])

《Spark原理深入与编程实战》