Spark 3新增数组函数

2021-11-15 17:12:21.0

Spark 3新增加了许多数组函数,以方便数据处理。下面我们为大家逐一介绍。

exists函数

方法签名:

def exists(column: Column, f: (Column) ⇒ Column): Column

功能:判断数组列column中是否存在满足断言f的元素,返回Boolean值。

请看下面的应用示例。

1) 构造一个包含数组值的DataFrame:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// 数据集
val data = List(
    ("a", Array(3, 4, 5)),
    ("b", Array(8, 12)),
    ("c", Array(7, 13)),
    ("d", null),
  )

// 列名
val columns = List("person_id","best_numbers")

// 创建DataFrame
val df = data.toDF(columns:_*)

// 查看
df.printSchema
df.show

执行以上代码,输出结果如下:

root
 |-- person_id: string (nullable = true)
 |-- best_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = false)

+---------+------------+
|person_id|best_numbers|
+---------+------------+
|        a|   [3, 4, 5]|
|        b|     [8, 12]|
|        c|     [7, 13]|
|        d|        null|
+---------+------------+

2) 创建列函数,如果列值是偶数,则是返回true。

import org.apache.spark.sql._

def isEven(col: Column): Column = {
  col % 2 === lit(0)
}

3) 为DataFrame增加一个新列,列值来自于对判断'best_numbers'列是否包含偶数值的判断(Boolean值)。

import org.apache.spark.sql.functions.exists

// 为DataFrame增加一个新列
val resDF = df.withColumn("even_best_number_exists", exists(col("best_numbers"), isEven))
resDF.show

执行以上代码,输出结果如下:

+---------+------------+-----------------------+
|person_id|best_numbers|even_best_number_exists|
+---------+------------+-----------------------+
|        a|   [3, 4, 5]|                   true|
|        b|     [8, 12]|                   true|
|        c|     [7, 13]|                  false|
|        d|        null|                   null|
+---------+------------+-----------------------+

也可以使用下面这样的简洁写法:

val resDF2 = resDF.withColumn("even2_best_number_exists", exists(col("best_numbers"), _ % 2 === 0))
resDF2.show

执行以上代码,输出结果如下:

+---------+------------+-----------------------+------------------------+
|person_id|best_numbers|even_best_number_exists|even2_best_number_exists|
+---------+------------+-----------------------+------------------------+
|        a|   [3, 4, 5]|                   true|                    true|
|        b|     [8, 12]|                   true|                    true|
|        c|     [7, 13]|                  false|                   false|
|        d|        null|                   null|                    null|
+---------+------------+-----------------------+------------------------+

forall函数

方法签名:

def forall(column: Column, f: (Column) ⇒ Column): Column

功能:判断数组列column中是否所有元素都满足断言f,返回Boolean值。

请看下面的应用示例。

1) 构造一个包含数组值的DataFrame:

// 数据集
val data = List(
    (Array("ants", "are", "animals")),
    (Array("italy", "is", "interesting")),
    (Array("brazilians", "love", "soccer")),
    (null),
  )

// 创建DataFrame
val df = data.toDF("words")

df.printSchema
df.show(false)

执行以上代码,输出结果如下:

root
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------------+
|words                     |
+--------------------------+
|[ants, are, animals]      |
|[italy, is, interesting]  |
|[brazilians, love, soccer]|
|null                      |
+--------------------------+

用forall来标出所有元素(单词)是以字母“a”开头的数组。

val resDF = df.withColumn("uses_alliteration_with_a", forall(col("words"), _.startsWith("a") ))
resDF.show(false)

执行以上代码,输出结果如下:

+--------------------------+------------------------+
|words                     |uses_alliteration_with_a|
+--------------------------+------------------------+
|[ants, are, animals]      |true                    |
|[italy, is, interesting]  |false                   |
|[brazilians, love, soccer]|false                   |
|null                      |null                    |
+--------------------------+------------------------+

filter函数

方法签名有两个:

def  filter(column: Column, f: (Column, Column) ⇒ Column): Column     // f: (col, index)
def  filter(column: Column, f: (Column) ⇒ Column): Column

功能:过滤数组中满足断言f的值。

请看下面的应用示例。

1) 构造一个包含数组值的DataFrame:

val data = List(
    (Array("bad", "bunny", "is", "funny")),
    (Array("food", "is", "bad", "tasty")),
    (null),
  )

// 创建DataFrame
val df = data.toDF("words")

df.printSchema
df.show(false)

执行以上代码,输出结果如下:

root
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)

+-----------------------+
|words                  |
+-----------------------+
|[bad, bunny, is, funny]|
|[food, is, bad, tasty] |
|null                   |
+-----------------------+

过滤掉words列中的word单词。

val resDF = df.withColumn("filtered_words", filter(col("words"), _ =!= "bad"))
resDF.show(false)

执行以上代码,输出结果如下:

+-----------------------+------------------+
|words                  |filtered_words    |
+-----------------------+------------------+
|[bad, bunny, is, funny]|[bunny, is, funny]|
|[food, is, bad, tasty] |[food, is, tasty] |
|null                   |null              |
+-----------------------+------------------+

transform

方法签名有两个:

def transform(column: Column, f: (Column, Column) ⇒ Column): Column     // f: (col, index)
def transform(column: Column, f: (Column) ⇒ Column): Column

功能:对数组中的每个元素应用transform函数,返回一个新的数组。

请看下面的应用示例。

1) 构造一个包含数组值的DataFrame:

val data = List(
    (Array("New York", "Seattle")),
    (Array("Barcelona", "Bangalore")),
    (null),
  )

// 创建DataFrame
val df = data.toDF("places")

df.printSchema
df.show(false)

执行以上代码,输出结果如下:

root
 |-- places: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------------+
|places                |
+----------------------+
|[New York, Seattle]   |
|[Barcelona, Bangalore]|
|null                  |
+----------------------+

对places列中的每个数组元素应用transform转换。

val resDF = df.withColumn("fun_places", transform(col("places"), concat(_, lit(" is fun!")) ))
resDF.show(false)

执行以上代码,输出结果如下:

+----------------------+--------------------------------------+
|places                |fun_places                            |
+----------------------+--------------------------------------+
|[New York, Seattle]   |[New York is fun!, Seattle is fun!]   |
|[Barcelona, Bangalore]|[Barcelona is fun!, Bangalore is fun!]|
|null                  |null                                  |
+----------------------+--------------------------------------+

aggregate函数

方法签名有两个:

def aggregate(expr: Column, initialValue: Column, merge: (Column, Column) ⇒ Column): Column
def aggregate(expr: Column, initialValue: Column, merge: (Column, Column) ⇒ Column, finish: (Column) ⇒ Column): Column

方法签名中各参数的含义如下:

  • expr:输入数组列
  • initialValue:初始值
  • merge:(combined_value, input_value) => combined_value, merge函数将一个输入值合并到combined_value
  • finish:combined_value => final_value, 这个lambda函数将所有输入的combined value转换为最终结果

功能:对数组元素进行指定的聚合。它将二元运算符应用于初始状态和数组中的所有元素,并将其简化为单一状态。如果有finish函数参数,则应用finish函数将最终状态转换为最终结果。

请看下面的应用示例。

1) 构造一个包含数组值的DataFrame:

val data = List(
    (Array(1, 2, 3, 4)),
    (Array(5, 6, 7)),
    (null),
  )
  
// 创建DataFrame
val df = data.toDF("numbers")

df.printSchema
df.show(false)

执行以上代码,输出结果如下:

root
 |-- numbers: array (nullable = true)
 |    |-- element: integer (containsNull = false)

+------------+
|numbers     |
+------------+
|[1, 2, 3, 4]|
|[5, 6, 7]   |
|null        |
+------------+

对numbers列的数组进行求和,计算的结果值作为新列numbers_sum的值。

val resDF = df.withColumn("numbers_sum", aggregate(col("numbers"), lit(0),  _ + _ ))
resDF.show

执行以上代码,输出结果如下:

+------------+-----------+
|     numbers|numbers_sum|
+------------+-----------+
|[1, 2, 3, 4]|         10|
|   [5, 6, 7]|         18|
|        null|       null|
+------------+-----------+

zip_with函数

方法签名:

def zip_with(left: Column, right: Column, f: (Column, Column) ⇒ Column): Column

方法签名中各参数的含义如下:

  • left:左边的输入数组列
  • right:右边的输入数组列
  • f:(lCol, rCol) => col, 将两个输入列合并为一列的lambda函数

功能:使用该函数将两个给定的数组按元素合并为一个数组。如果一个数组较短,则在应用此函数之前,在其末尾添加空值以匹配较长的数组的长度。

请看下面的应用示例。

1) 构造一个包含数组值的DataFrame:

val data = List(
    (Array("a", "b"), Array("c", "d")),
    (Array("x", "y"), Array("p", "o")),
    (null, Array("e", "r"))
  )

// 列名
val columns = List("letters1","letters2")

// 创建DataFrame
val df = data.toDF(columns:_*)

df.printSchema
df.show(false)

执行以上代码,输出结果如下:

root
 |-- letters1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- letters2: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------+--------+
|letters1|letters2|
+--------+--------+
|[a, b]  |[c, d]  |
|[x, y]  |[p, o]  |
|null    |[e, r]  |
+--------+--------+


val resDF = df.withColumn(
  "zipped_letters",
  zip_with(
    col("letters1"),
    col("letters2"),
    concat_ws("***", _, _)   // (left: Column, right: Column) => concat_ws("***", left, right)
  )
)

resDF.show(false)

执行以上代码,输出结果如下:

+--------+--------+--------------+
|letters1|letters2|zipped_letters|
+--------+--------+--------------+
|[a, b]  |[c, d]  |[a***c, b***d]|
|[x, y]  |[p, o]  |[x***p, y***o]|
|null    |[e, r]  |null          |
+--------+--------+--------------+

《PySpark原理深入与编程实战》