PySpark SQL函数应用示例-二次排序实现
【示例】应用PySpark DataFrame API实现数据的二次排序。
假设在HDFS的/data/spark/路径下有一个输入文件data.txt,内容如下:
2018,5,22 2019,1,24 2018,2,128 2019,3,56 2019,1,3 2019,2,-43 2019,4,5 2019,3,46 2018,2,64 2019,1,4 2019,1,21 2019,2,35 2019,2,0
其中每一行由逗号分割的分别是年、月和总数。现在想要对这些数据排序,期望的输出结果是先按年月进行排序,相同年月的情况下,数值列表按大小排序,如下所示:
2018-2 64,128 2018-5 22 2019-1 3,4,21,24 2019-2 -43,0,35 2019-3 46,56 2019-4 5
请按以下步骤实现。
(1) 加载数据集到一个DataFrame中,代码如下:
from pyspark.sql import SparkSession from pyspark.sql.functions import * spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark demo") \ .getOrCreate() # 加载数据集 inputPath = "/data/spark/data.txt" inputDF = spark.read \ .option("inferSchema","true") \ .option("header","false") \ .csv(inputPath) \ .toDF("year","month","cnt") inputDF.show()
执行以上代码,输出结果如下:
+----+-----+---+ |year|month|cnt| +----+-----+---+ |2018| 5| 22| |2019| 1| 24| |2018| 2|128| |2019| 3| 56| |2019| 1| 3| |2019| 2|-43| |2019| 4| 5| |2019| 3| 46| |2018| 2| 64| |2019| 1| 4| |2019| 1| 21| |2019| 2| 35| |2019| 2| 0| +----+-----+---+
(2) 将year和month组合为一列,并取别名ym,代码如下:
from pyspark.sql.functions import * df2 = inputDF.select(concat_ws("-","year","month").alias("ym"), "cnt") df2.printSchema() df2.show()
执行以上代码,输出结果如下:
root |-- ym: string (nullable = false) |-- cnt: integer (nullable = true) +------+---+ | ym|cnt| +------+---+ |2018-5| 22| |2019-1| 24| |2018-2|128| |2019-3| 56| |2019-1| 3| |2019-2|-43| |2019-4| 5| |2019-3| 46| |2018-2| 64| |2019-1| 4| |2019-1| 21| |2019-2| 35| |2019-2| 0| +------+---+
(3) 先按ym进行分组聚合,然后对每一组的cnt列进行排序,并输出,代码如下:
df2.groupBy("ym") \ .agg(sort_array(collect_list("cnt")).alias("cnt")) \ .orderBy("ym") \ .show()
执行以上代码,输出结果如下:
+------+--------------+ | ym| cnt| +------+--------------+ |2018-2| [64, 128]| |2018-5| [22]| |2019-1|[3, 4, 21, 24]| |2019-2| [-43, 0, 35]| |2019-3| [46, 56]| |2019-4| [5]| +------+--------------+
(4) 最后,可以把上面的代码写到一个ETL处理逻辑当中,代码如下:
from pyspark.sql.functions import * inputPath = "/data/spark/data.txt" spark.read \ .option("inferSchema","true") \ .option("header","false") \ .csv(inputPath) \ .toDF("year","month","cnt") \ .select(concat_ws("-","year","month").alias("ym"),"cnt") \ .groupBy("ym") \ .agg(sort_array(collect_list("cnt")).alias("cnt")) \ .orderBy("ym") \ .show()