Spark 3新增数组函数
2021-11-15 17:12:21.0
Spark 3新增加了许多数组函数,以方便数据处理。下面我们为大家逐一介绍。
exists函数
方法签名:
def exists(column: Column, f: (Column) ⇒ Column): Column
功能:判断数组列column中是否存在满足断言f的元素,返回Boolean值。
请看下面的应用示例。
1) 构造一个包含数组值的DataFrame:
import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ // 数据集 val data = List( ("a", Array(3, 4, 5)), ("b", Array(8, 12)), ("c", Array(7, 13)), ("d", null), ) // 列名 val columns = List("person_id","best_numbers") // 创建DataFrame val df = data.toDF(columns:_*) // 查看 df.printSchema df.show
执行以上代码,输出结果如下:
root |-- person_id: string (nullable = true) |-- best_numbers: array (nullable = true) | |-- element: integer (containsNull = false) +---------+------------+ |person_id|best_numbers| +---------+------------+ | a| [3, 4, 5]| | b| [8, 12]| | c| [7, 13]| | d| null| +---------+------------+
2) 创建列函数,如果列值是偶数,则是返回true。
import org.apache.spark.sql._ def isEven(col: Column): Column = { col % 2 === lit(0) }
3) 为DataFrame增加一个新列,列值来自于对判断'best_numbers'列是否包含偶数值的判断(Boolean值)。
import org.apache.spark.sql.functions.exists // 为DataFrame增加一个新列 val resDF = df.withColumn("even_best_number_exists", exists(col("best_numbers"), isEven)) resDF.show
执行以上代码,输出结果如下:
+---------+------------+-----------------------+ |person_id|best_numbers|even_best_number_exists| +---------+------------+-----------------------+ | a| [3, 4, 5]| true| | b| [8, 12]| true| | c| [7, 13]| false| | d| null| null| +---------+------------+-----------------------+
也可以使用下面这样的简洁写法:
val resDF2 = resDF.withColumn("even2_best_number_exists", exists(col("best_numbers"), _ % 2 === 0)) resDF2.show
执行以上代码,输出结果如下:
+---------+------------+-----------------------+------------------------+ |person_id|best_numbers|even_best_number_exists|even2_best_number_exists| +---------+------------+-----------------------+------------------------+ | a| [3, 4, 5]| true| true| | b| [8, 12]| true| true| | c| [7, 13]| false| false| | d| null| null| null| +---------+------------+-----------------------+------------------------+
forall函数
方法签名:
def forall(column: Column, f: (Column) ⇒ Column): Column
功能:判断数组列column中是否所有元素都满足断言f,返回Boolean值。
请看下面的应用示例。
1) 构造一个包含数组值的DataFrame:
// 数据集 val data = List( (Array("ants", "are", "animals")), (Array("italy", "is", "interesting")), (Array("brazilians", "love", "soccer")), (null), ) // 创建DataFrame val df = data.toDF("words") df.printSchema df.show(false)
执行以上代码,输出结果如下:
root |-- words: array (nullable = true) | |-- element: string (containsNull = true) +--------------------------+ |words | +--------------------------+ |[ants, are, animals] | |[italy, is, interesting] | |[brazilians, love, soccer]| |null | +--------------------------+
用forall来标出所有元素(单词)是以字母“a”开头的数组。
val resDF = df.withColumn("uses_alliteration_with_a", forall(col("words"), _.startsWith("a") )) resDF.show(false)
执行以上代码,输出结果如下:
+--------------------------+------------------------+ |words |uses_alliteration_with_a| +--------------------------+------------------------+ |[ants, are, animals] |true | |[italy, is, interesting] |false | |[brazilians, love, soccer]|false | |null |null | +--------------------------+------------------------+
filter函数
方法签名有两个:
def filter(column: Column, f: (Column, Column) ⇒ Column): Column // f: (col, index) def filter(column: Column, f: (Column) ⇒ Column): Column
功能:过滤数组中满足断言f的值。
请看下面的应用示例。
1) 构造一个包含数组值的DataFrame:
val data = List( (Array("bad", "bunny", "is", "funny")), (Array("food", "is", "bad", "tasty")), (null), ) // 创建DataFrame val df = data.toDF("words") df.printSchema df.show(false)
执行以上代码,输出结果如下:
root |-- words: array (nullable = true) | |-- element: string (containsNull = true) +-----------------------+ |words | +-----------------------+ |[bad, bunny, is, funny]| |[food, is, bad, tasty] | |null | +-----------------------+
过滤掉words列中的word单词。
val resDF = df.withColumn("filtered_words", filter(col("words"), _ =!= "bad")) resDF.show(false)
执行以上代码,输出结果如下:
+-----------------------+------------------+ |words |filtered_words | +-----------------------+------------------+ |[bad, bunny, is, funny]|[bunny, is, funny]| |[food, is, bad, tasty] |[food, is, tasty] | |null |null | +-----------------------+------------------+
transform
方法签名有两个:
def transform(column: Column, f: (Column, Column) ⇒ Column): Column // f: (col, index) def transform(column: Column, f: (Column) ⇒ Column): Column
功能:对数组中的每个元素应用transform函数,返回一个新的数组。
请看下面的应用示例。
1) 构造一个包含数组值的DataFrame:
val data = List( (Array("New York", "Seattle")), (Array("Barcelona", "Bangalore")), (null), ) // 创建DataFrame val df = data.toDF("places") df.printSchema df.show(false)
执行以上代码,输出结果如下:
root |-- places: array (nullable = true) | |-- element: string (containsNull = true) +----------------------+ |places | +----------------------+ |[New York, Seattle] | |[Barcelona, Bangalore]| |null | +----------------------+
对places列中的每个数组元素应用transform转换。
val resDF = df.withColumn("fun_places", transform(col("places"), concat(_, lit(" is fun!")) )) resDF.show(false)
执行以上代码,输出结果如下:
+----------------------+--------------------------------------+ |places |fun_places | +----------------------+--------------------------------------+ |[New York, Seattle] |[New York is fun!, Seattle is fun!] | |[Barcelona, Bangalore]|[Barcelona is fun!, Bangalore is fun!]| |null |null | +----------------------+--------------------------------------+
aggregate函数
方法签名有两个:
def aggregate(expr: Column, initialValue: Column, merge: (Column, Column) ⇒ Column): Column def aggregate(expr: Column, initialValue: Column, merge: (Column, Column) ⇒ Column, finish: (Column) ⇒ Column): Column
方法签名中各参数的含义如下:
- expr:输入数组列
- initialValue:初始值
- merge:(combined_value, input_value) => combined_value, merge函数将一个输入值合并到combined_value
- finish:combined_value => final_value, 这个lambda函数将所有输入的combined value转换为最终结果
功能:对数组元素进行指定的聚合。它将二元运算符应用于初始状态和数组中的所有元素,并将其简化为单一状态。如果有finish函数参数,则应用finish函数将最终状态转换为最终结果。
请看下面的应用示例。
1) 构造一个包含数组值的DataFrame:
val data = List( (Array(1, 2, 3, 4)), (Array(5, 6, 7)), (null), ) // 创建DataFrame val df = data.toDF("numbers") df.printSchema df.show(false)
执行以上代码,输出结果如下:
root |-- numbers: array (nullable = true) | |-- element: integer (containsNull = false) +------------+ |numbers | +------------+ |[1, 2, 3, 4]| |[5, 6, 7] | |null | +------------+
对numbers列的数组进行求和,计算的结果值作为新列numbers_sum的值。
val resDF = df.withColumn("numbers_sum", aggregate(col("numbers"), lit(0), _ + _ )) resDF.show
执行以上代码,输出结果如下:
+------------+-----------+ | numbers|numbers_sum| +------------+-----------+ |[1, 2, 3, 4]| 10| | [5, 6, 7]| 18| | null| null| +------------+-----------+
zip_with函数
方法签名:
def zip_with(left: Column, right: Column, f: (Column, Column) ⇒ Column): Column
方法签名中各参数的含义如下:
- left:左边的输入数组列
- right:右边的输入数组列
- f:(lCol, rCol) => col, 将两个输入列合并为一列的lambda函数
功能:使用该函数将两个给定的数组按元素合并为一个数组。如果一个数组较短,则在应用此函数之前,在其末尾添加空值以匹配较长的数组的长度。
请看下面的应用示例。
1) 构造一个包含数组值的DataFrame:
val data = List( (Array("a", "b"), Array("c", "d")), (Array("x", "y"), Array("p", "o")), (null, Array("e", "r")) ) // 列名 val columns = List("letters1","letters2") // 创建DataFrame val df = data.toDF(columns:_*) df.printSchema df.show(false)
执行以上代码,输出结果如下:
root |-- letters1: array (nullable = true) | |-- element: string (containsNull = true) |-- letters2: array (nullable = true) | |-- element: string (containsNull = true) +--------+--------+ |letters1|letters2| +--------+--------+ |[a, b] |[c, d] | |[x, y] |[p, o] | |null |[e, r] | +--------+--------+
val resDF = df.withColumn( "zipped_letters", zip_with( col("letters1"), col("letters2"), concat_ws("***", _, _) // (left: Column, right: Column) => concat_ws("***", left, right) ) ) resDF.show(false)
执行以上代码,输出结果如下:
+--------+--------+--------------+ |letters1|letters2|zipped_letters| +--------+--------+--------------+ |[a, b] |[c, d] |[a***c, b***d]| |[x, y] |[p, o] |[x***p, y***o]| |null |[e, r] |null | +--------+--------+--------------+