结构化流DataFrame操作

前面的例子表明,一旦配置和定义了数据源,DataStreamReader将返回一个DataFrame的实例。这意味着我们可以使用大多数熟悉的操作和PySpark SQL函数来表达应用程序流计算逻辑。但是要注意,并不是所有的DataFrame操作都受流式DataFrame支持的,比如limit、distinct和sort就不能在流DataFrame上使用,这是因为它们在流数据处理的上下文中不适用。

选择、投影和聚合操作

结构化流的一个优点是具有一组用于PySpark的批处理和流处理的统一API。使用流数据格式的DataFrame,可以应用任何select和filter转换,以及任何作用在个别列上的PySpark SQL函数。此外,基本聚合和高级分析函数也可用于流DataFrame。

【示例】移动电话事件数据流分析。

移动电话的开关机等事件会保存在json格式的文件中。现在编写PySpark结构化流处理程序来读取这些事件并处理。请按以下步骤操作。

1)准备数据

在本示例中,我们使用文件数据源,该数据源以json文件的格式记录了一小组移动电话动作事件。每个事件由三个字段组成:

  • id:表示手机的唯一ID。在样例数据集中,电话ID将类似于phone1、phone2、phone3等。
  • action:表示用户所采取的操作。该操作的可能值是"open"或"close"。
  • ts:表示用户action发生时的时间戳。这是事件时间(event time)。

我们准备了三个存储移动电话事件数据的JSON文件:file1.json, file2.json, file3.json。这三个文件位于PBLP平台的~/data/spark/mobile目录下。

为了模拟数据流的行为,我们将把这三个JSON文件复制到项目的“src/main/data/mobile”目录下。

2)先导入相关的依赖包,并构造一个SparkSession实例,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 创建SparkSession实例
spark = SparkSession \
        .builder \
        .appName("streaming demo") \
        .getOrCreate()

# 设置shuffle后的分区数为10(测试环境下)
spark.conf.set("spark.sql.shuffle.partitions",10)

3)为手机事件数据创建模式(schema)

默认情况下,结构化流在从基于文件的数据源读取数据时需要一个模式(因为最初目录可能是空的,因此结构化的流无法推断模式)。但是,可以设置配置参数spark.sql.streaming.schemaInference的值为true来启用模式推断。在这个例子中,我们将显式地创建一个模式,代码如下所示:

# 为手机事件数据创建一个schema
fields = [
      StructField("id", StringType(), nullable = False),
      StructField("action", StringType(), nullable = False),
      StructField("ts", TimestampType(), nullable = False)
]
mobileDataSchema = StructType(fields)

3)读取流文件数据源,创建DataFrame,并将action列值转换为大写,代码如下:

# 指定监听的文件目录
dataPath = "/data/spark/stream/mobile"

# 读取指定目录下的源数据文件,一次一个
mobileDF = spark.readStream \
      .option("maxFilesPerTrigger", 1) \
      .option("mode","failFast") \
      .schema(mobileDataSchema) \
      .json(dataPath)

# mobileSSDF.printSchema()

4)将action列值转换为大写,执行过滤、投影、聚合等转换操作,代码如下:

mobileDF2 = mobileDF \
      .where("action='open' or action='close'") \
      .withColumn("action",upper(col("action"))) \
      .select("id","action","ts") \
      .groupBy("action") \
      .count()

5)将结果DataFrame输出到控制台显示,代码如下:

# 结果输出到控制台
query = mobileDF2.writeStream \
      .format("console") \
      .option("truncate","false") \
      .outputMode("complete") \
      .start()

# 等待流程序执行结束(作为作业文件提交时启用)
# query.awaitTermination()

6)执行流处理程序,输出结果如下:

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
|action|count|
+------+-----+
|CLOSE |1    |
|OPEN  |3    |
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|action|count|
+------+-----+
|CLOSE |2    |
|OPEN  |4    |
+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
|action|count|
+------+-----+
|CLOSE |3    |
|OPEN  |5    |
+------+-----+

完整的代码如下。

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 创建SparkSession实例
spark = SparkSession \
        .builder \
        .appName("streaming demo") \
        .getOrCreate()

# 设置shuffle后的分区数为10(测试环境下)
spark.conf.set("spark.sql.shuffle.partitions",10)

# 为手机事件数据创建一个schema
fields = [
      StructField("id", StringType(), nullable = False),
      StructField("action", StringType(), nullable = False),
      StructField("ts", TimestampType(), nullable = False)
]
mobileDataSchema = StructType(fields)

# 指定监听的文件目录
dataPath = "/data/spark/stream/mobile"

# 读取指定目录下的源数据文件,一次一个
mobileDF = spark.readStream \
      .option("maxFilesPerTrigger", 1) \
      .option("mode","failFast") \
      .schema(mobileDataSchema) \
      .json(dataPath)

mobileDF2 = mobileDF \
      .where("action='open' or action='close'") \
      .withColumn("action",upper(col("action"))) \
      .select("id","action","ts") \
      .groupBy("action") \
      .count()

# 结果输出到控制台
query = mobileDF2.writeStream \
      .format("console") \
      .option("truncate","false") \
      .outputMode("complete") \
      .start()

# 等待流程序执行结束(作为作业文件提交时启用)
query.awaitTermination()

在这个示例中,我们采用的输出模式是“complete”。在没有聚合操作的情况下,不能使用“complete”输出模式;在有聚合操作的情况下,不能使用“append”模式。

需要注意,在流DataFrame中,不支持以下DataFrame转换(因为它们太过复杂,无法维护状态,或者由于流数据的无界性):

  • 在流DataFrame上的多个聚合或聚合链。
  • limit和take N行。
  • distinct转换。
  • 在没有任何聚合的情况下对流DataFrame进行排序。

任何使用不受支持的操作的尝试都会导致一个AnalysisException异常以及类似“XYZ操作不受流streaming DataFrame/Datasets支持”这样的消息。


《Spark原理深入与编程实战》