PySpark SQL函数应用示例-二次排序实现

【示例】应用PySpark DataFrame API实现数据的二次排序。

假设在HDFS的/data/spark/路径下有一个输入文件data.txt,内容如下:

2018,5,22
2019,1,24
2018,2,128
2019,3,56
2019,1,3
2019,2,-43
2019,4,5
2019,3,46
2018,2,64
2019,1,4
2019,1,21
2019,2,35
2019,2,0

其中每一行由逗号分割的分别是年、月和总数。现在想要对这些数据排序,期望的输出结果是先按年月进行排序,相同年月的情况下,数值列表按大小排序,如下所示:

2018-2  64,128
2018-5  22
2019-1  3,4,21,24
2019-2  -43,0,35
2019-3  46,56
2019-4  5

请按以下步骤实现。

(1) 加载数据集到一个DataFrame中,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark demo") \
   .getOrCreate()

# 加载数据集
inputPath = "/data/spark/data.txt"
inputDF = spark.read \
               .option("inferSchema","true") \
               .option("header","false") \
               .csv(inputPath) \
               .toDF("year","month","cnt")

inputDF.show()

执行以上代码,输出结果如下:

+----+-----+---+
|year|month|cnt|
+----+-----+---+
|2018|     5| 22|
|2019|     1| 24|
|2018|     2|128|
|2019|     3| 56|
|2019|     1|  3|
|2019|     2|-43|
|2019|     4|  5|
|2019|     3| 46|
|2018|     2| 64|
|2019|     1|  4|
|2019|     1| 21|
|2019|     2| 35|
|2019|     2|  0|
+----+-----+---+

(2) 将year和month组合为一列,并取别名ym,代码如下:

from pyspark.sql.functions import *

df2 = inputDF.select(concat_ws("-","year","month").alias("ym"), "cnt")
df2.printSchema()
df2.show()

执行以上代码,输出结果如下:

root
 |-- ym: string (nullable = false)
 |-- cnt: integer (nullable = true)

+------+---+
|     ym|cnt|
+------+---+
|2018-5| 22|
|2019-1| 24|
|2018-2|128|
|2019-3| 56|
|2019-1|  3|
|2019-2|-43|
|2019-4|  5|
|2019-3| 46|
|2018-2| 64|
|2019-1|  4|
|2019-1| 21|
|2019-2| 35|
|2019-2|  0|
+------+---+

(3) 先按ym进行分组聚合,然后对每一组的cnt列进行排序,并输出,代码如下:

df2.groupBy("ym") \
   .agg(sort_array(collect_list("cnt")).alias("cnt")) \
   .orderBy("ym") \
   .show()

执行以上代码,输出结果如下:

+------+--------------+
|    ym|              cnt|
+------+--------------+
|2018-2|      [64, 128]|
|2018-5|            [22]|
|2019-1|[3, 4, 21, 24]|
|2019-2|  [-43, 0, 35]|
|2019-3|       [46, 56]|
|2019-4|             [5]|
+------+--------------+

(4) 最后,可以把上面的代码写到一个ETL处理逻辑当中,代码如下:

from pyspark.sql.functions import *

inputPath = "/data/spark/data.txt"

spark.read \
     .option("inferSchema","true") \
     .option("header","false") \
     .csv(inputPath) \
     .toDF("year","month","cnt") \
     .select(concat_ws("-","year","month").alias("ym"),"cnt") \
     .groupBy("ym") \
     .agg(sort_array(collect_list("cnt")).alias("cnt")) \
     .orderBy("ym") \
     .show()

《Flink原理深入与编程实战》