PySpark SQL集合元素处理函数
集合被设计用来处理复杂的数据类型,如arrays、maps和struts。本节将介绍两种特定类型的集合函数。第一种方法是使用数组数据类型,第二种方法是处理为JSON数据格式。
1. 数组处理函数
PySpark DataFrame支持复杂数据类型,也就是列值可以是一个集合。可以使用数组相关的集合函数来轻松获取数组大小、检查值的存在、或者对数组进行排序。下面的示例包含了处理各种数组的相关函数用法,代码如下:
from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark demo") \ .getOrCreate() # 创建一个任务集DataFrame tasksDF = spark.createDataFrame([("星期天", ["抽烟", "喝酒", "去烫头"])], ["day", "tasks"]) # tasksDF的schema tasksDF.printSchema() tasksDF.show()
执行以上代码,输出结果如下:
root |-- day: string (nullable = true) |-- tasks: array (nullable = true) | |-- element: string (containsNull = true) +------+--------------------+ |day | tasks | +------+--------------------+ |星期天 |[抽烟, 喝酒, 去烫头] | +------+--------------------+
接下来获得该数组的大小,对其进行排序,并检查在该数组中是否存在一个指定的值。代码如下:
tasksDF \ .select( "day", size("tasks").alias("size"), # 数组大小 sort_array("tasks").alias("sorted_tasks"), # 对数组排序 array_contains("tasks", "去烫头").alias("是否去烫头") # 是否包含 ) \ .show(truncate=False)
执行以上代码,输出结果如下:
+------+----+--------------------+----------+ |day |size|sorted_tasks | 是否去烫头| +------+----+--------------------+----------+ |星期天 |3 |[去烫头, 喝酒, 抽烟] | true | +------+----+--------------------+----------+
使用explode()表函数将为数组中的每一个元素创建一个新行,代码如下:
tasksDF.select("day", explode("tasks").alias("task")).show()
执行以上代码,输出结果如下:
+------+------+ | day| task| +------+------+ | 星期天| 抽烟| | 星期天| 喝酒| | 星期天| 去烫头| +------+------+
2. JSON处理函数
许多非结构化数据集都是以JSON的形式存在的。对于JSON数据类型的列,使用相关的集合函数将JSON字符串转换成struct(结构体)数据类型。主要的函数是from_json()、get_json_object()和to_json()。一旦JSON字符串被转换为PySpark struct数据类型,就可以轻松地提取这些值。下面的代码演示了from_json()和to_json()函数的示例。
首先构造一个带有JSON字符串内容的DataFrame,代码如下:
from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark demo") \ .getOrCreate() # 创建一个字符串,它包含有JSON格式的字符串内容 todos = """{"day": "星期天","tasks": ["抽烟", "喝酒", "去烫头"]}""" todoStrDF = spark.createDataFrame([(todos,)], ["todos_str"]) # 查看schema和内容 todoStrDF.printSchema() todoStrDF.show(truncate=False)
执行以上代码,输出结果如下:
root |-- todos_str: string (nullable = true) +-----------------------------------------------------+ |todos_str | +-----------------------------------------------------+ | {"day": "星期天","tasks": ["抽烟", "喝酒", "去烫头"]}| +-----------------------------------------------------+
为了将一个JSON字符串转换为一个PySpark结构体数据类型,需要将其结构描述给PySpark,为此需要定义一个Schema模式,并在from_json()函数中应用,代码如下:
from pyspark.sql.types import * todoSchema = StructType([ StructField("day", StringType(), True), StructField("tasks", ArrayType(StringType()), True) ]) # 使用from_json来转换JSON string todosDF = todoStrDF \ .select(from_json("todos_str",todoSchema).alias("todos")) # todos是一个struct数据类型,包含两个字段:day 和 tasks todosDF.printSchema() todosDF.show()
执行以上代码,输出结果如下:
root |-- todos: struct (nullable = true) | |-- day: string (nullable = true) | |-- tasks: array (nullable = true) | | |-- element: string (containsNull = true) +------------------------------+ | todos| +------------------------------+ | {星期天, [抽烟, 喝酒, 去烫头]}| +------------------------------+
可以使用Column类的getItem()函数检索出结构体数据类型的值,代码如下:
todosDF \ .select( col("todos").getItem("day"), col("todos").getItem("tasks"), col("todos").getItem("tasks")[0].alias("first_task") ) \ .show(truncate=False)
执行以上代码,输出结果如下:
+---------+--------------------+----------+ |todos.day|todos.tasks |first_task| +---------+--------------------+----------+ |星期天 |[抽烟, 喝酒, 去烫头] |抽烟 | +---------+--------------------+----------+
也可以使用to_json()函数将一个PySpark结构体数据类型转换为JSON格式字符串,代码如下:
todosDF.select(to_json("todos")).show(truncate=False)
执行以上代码,输出结果如下:
+-------------------------------------------------+ | to_json(todos) | +-------------------------------------------------+ | {"day":"星期天","tasks":["抽烟","喝酒","去烫头"]}| +-------------------------------------------------+