发布日期:2022-11-29 VIP内容

Spark DataFrame行转列实现

什么是DataFrame的行转列?

例如,有这样的需求:“请计算每个省份累计订单量,然后根据每个省份订单量从高到低排列”。如下所示:

+--------+--------+
|province|  amount|
+--------+--------+
|     A省|   10122|
|     B省|     301|
|     C省|23333333|
+--------+--------+

要求以下面指定的格式输出结果:

+-----+---+--------+
|  A省|B省|     C省|
+-----+---+--------+
|10122|301|23333333|
+-----+---+--------+

这就是通常所说的“行转列”。

DataFrame的行转列实现方法

在Spark中有多种方式可以实现这样的行转列。首先构造一个初始的“省份-订单”数据表,代码如下:

val data = Seq(("A省",10122), ("B省",301), ("C省",23333333))

val df = data.toDF("province","amount")
df.show() 

执行以上代码,输出内容如下:

+--------+--------+
|province|  amount|
+--------+--------+
|     A省|   10122|
|     B省|     301|
|     C省|23333333|
+--------+--------+

下面我们介绍三种实现DataFrame行转列的方法。

方法一:使用pivot()函数

最简单的方法是使用pivot()透视函数。实现代码如下:

df
  .groupBy()
  .pivot("province").agg(first("amount"))
  .show()

执行以上代码,输出结果如下:

+-----+---+--------+
|  A省|B省|     C省|
+-----+---+--------+
|10122|301|23333333|
+-----+---+--------+

可以看到,很容易就实现了DataFrame的行转列。不过这个行转列离一开始的需求还有点差距,订单量没有按顺序由大到小排列。在这种情况下,我们可以使用另外不采用pivot()函数的两种转换方法。

方法二:使用虚拟表

当数据量不大时,可以先将DataFrame转换为本地的一个List,然后通过使用foldLeft()函数在一个虚拟表上依次组合结果,代码如下:

// 当数据量不大时,首先collect到一个List
val mp = df.orderBy($"amount".desc).as[(String,Long)].collect.toList

// 创建一个1*1的表
val f = Seq("1").toDF("dummy")

// 组合
mp.foldLeft(f){(acc,x) => acc.withColumn(x._1,lit(x._2)) }.drop("dummy").show(false) 

执行以上代码,输出结果如下:

+--------+-----+---+
|C省     |A省  |B省|
+--------+-----+---+
|23333333|10122|301|
+--------+-----+---+

从结果可以看到,不但实现了DataFrame的行转列,而且也按订单量实现了由大到小的排序。

方法三:手动合成

当数据量不大时,可以先将DataFrame转换为本地的一个Map,然后手动提取列名和值,并组合结果。实现代码如下:

import org.apache.spark.sql.types._

// 当数据量不大时,首先collect到一个Map
val mp = df.orderBy($"amount".desc).as[(String,Long)].collect.toMap
val (keys, values) = mp.toList.sortBy(_._2).reverse.unzip

println(keys)
println(values)

import org.apache.spark.sql._

val schema = StructType(keys.map(k => StructField(k, LongType, nullable = false)))
val rows = spark.sparkContext.parallelize(Seq(Row(values: _*)))

val df_new = spark.createDataFrame(rows, schema)
df_new.show()

执行以上代码,输出结果如下:

List(C省, A省, B省)
List(23333333, 10122, 301)
+--------+-----+---+
|     C省|  A省|B省|
+--------+-----+---+
|23333333|10122|301|
+--------+-----+---+

从结果可以看到,不但实现了DataFrame的行转列,而且也按订单量实现了由大到小的排序。