使用水印

在流处理引擎中,水印是一种常用的技术,用于处理延迟数据,以及限制维护它所需的状态数量。

限制聚合状态数量

通过应用于事件时间上的窗口聚合(固定窗口聚合或滑动窗口聚合),在Spark结构化流中可以很容易地执行常见的和复杂的流处理操作。虽然表面上看似乎很容易,但在其内部,结构化流引擎和Spark SQL引擎协同工作,在执行流聚合时,以容错的方式维护中间聚合结果。

事实上,任何时候在流查询上执行聚合时,都必须维护中间聚合状态。这个状态保存在key-value对结构中,类似于散列映射(hash map),其中key是group name,value是中间聚合值。在上一节的例子中,通过滑动窗口和机架ID进行聚合,其中key就是窗口的开始和结束时间以及机架名称的组合值,而value则是平均温度。

中间状态存储在Spark executors的内存中、版本化的key-value状态存储中,并将其写到一个预写日志中(该日志应该被配置为驻留在像HDFS这样的稳定存储系统中)。在每个触发点上,该状态都在内存中的状态存储中读取和更新,然后写入到预写日志中。在失败的情况下,当Spark结构化的流应用程序重新启动时,状态从预写日志中还原,从那个点恢复。这种容错的状态管理显然会在结构化流引擎中产生一些资源和处理开销。因此,开销的大小与它需要维护的状态量成正比,因此,保持状态的数量在一个可以接受大小是很重要的;换句话说,状态的规模不应该无限增长。

考虑到滑动窗口的性质,窗口的数量将会无限增长。这意味着执行滑动窗口聚合会导致中间状态无限地增长,因此,必须有一种方法可以删除不再更新的旧状态。在PySpark结构化流处理技术中,这是通过叫做“水印(watermarking)”的技术完成的。

指定水印的最大好处之一是能让结构化流引擎可以安全地删除比水印更古老的窗口的聚合状态。生产环境下执行任何类型聚合的流应用程序都应该指定一个水印来避免内存不足的问题。

处理迟到的数据

在现实世界中,流数据往往会不按顺序到达,以及因为网络拥挤、网络中断或数据生成器(如移动设备等)不在线而延迟到达。作为一个实时流应用程序的开发人员,必须要知道想要怎样处理比某个阈值晚一些的数据。换句话说,数据延迟到达时间量是多少时才是可以接受的,或者说对这个时间量之后迟到的数据置之不理?这取决于应用场景。

从结构化流的角度来看,水印是事件时间(event time)的移动阈值,它位于目前所见的最新事件时间之后。随着新事件不断到达,这将导致水印也不断移动。

【示例】使用水印来处理延迟到达的移动电话操作事件数据。

请按以下步骤操作。

1)准备数据

在本示例中,我们使用文件数据源,代表移动电话操作事件数据存储在两个JSON格式的文件中。每个事件由三个字段组成:

  • id:表示手机的唯一ID,字符串类型。
  • action:表示用户所采取的操作。该操作的可能值是"open"或"close"。
  • ts:表示用户action发生时的时间戳。这是事件时间(event time)。

两个数据文件file1.json和file2.json,位于PBLP平台的如下位置:~/data/spark/phones。

注意观察这两个数据文件中的数据。数据以这样一种方式设置,即file1.json文件中的每一行进入了它自己的10分钟窗口,那么file1.json的处理会形成三个窗口:10:10:00-10:20:00、10:20:00-10:30:00和10:30:00-10:40:00。我们指定水印为10分钟。file2.json文件中的数据代表迟到的数据,其中第一行落在10:20:00-10:30:00窗口中,所以即使它到达的时间较晚,它的时间戳仍然在水印的阈值范围内,因此它将被处理。file2.json文件中的最后一行数据的时间戳在10:10:00-10:20:00窗口中,由于它超出了水印的阈值,所以它将被忽略,而不会被处理。

为了模拟数据流的行为,我们将把这两个JSON文件复制到项目的“src/main/resources/mobile3”目录下。

2)代码编写。

实现的流查询代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 创建SparkSession实例
spark = SparkSession \
        .builder \
        .appName("late data demo") \
        .getOrCreate()

# 设置shuffle后的分区数为2(测试环境下)
spark.conf.set("spark.sql.shuffle.partitions",2)

# 设置日志级别
# spark.sparkContext.setLogLevel("WARN") 

# 为手机事件数据创建一个schema
fields = [
      StructField("id", StringType(), nullable = False),
      StructField("action", StringType(), nullable = False),
      StructField("ts", TimestampType(), nullable = False)
]
mobileDataSchema = StructType(fields)

# 指定监听的文件目录
dataPath = "/data/spark/stream/mobile4"

# 读取指定目录下的源数据文件,一次一个
mobileDF = spark.readStream \
      .option("maxFilesPerTrigger", 1) \
      .option("mode","failFast") \
      .schema(mobileDataSchema) \
      .json(dataPath)
   
# 设置一个带有水印的流DataFrame,并按ts和action列分组
# 水印,10分钟,必须先于groupBy调用
# 指定的窗口列必须与水印中指定的列一致
windowCountDF = mobileDF \
      .withWatermark("ts", "10 minutes") \
      .groupBy(window("ts", "10 minutes"), "action") \
      .count()

windowCountDF.printSchema()

# 输出到控制台
windowCountDF \
      .select("window.start", "window.end", "action", "count") \
      .writeStream \
      .format("console") \
      .option("truncate", "false") \
      .outputMode("update") \
      .start()

# 等待流程序执行结束(作为作业文件提交时启用)
# query.awaitTermination()

3)执行程序,输出源数据的结构:

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- action: string (nullable = false)
 |-- count: long (nullable = false)

当它读取到第一个流数据文件file1.json时,输出结果如下。

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+-------------------+------+-----+
|start              |end                |action|count|
+-------------------+-------------------+------+-----+
|2018-03-02 10:30:00|2018-03-02 10:40:00|open  |1    |
|2018-03-02 10:10:00|2018-03-02 10:20:00|open  |1    |
|2018-03-02 10:20:00|2018-03-02 10:30:00|open  |1    |
+-------------------+-------------------+------+-----+

正如期望的,每一行都落在它自己的窗口内。

当它读取到第一个流数据文件file2.json时,输出结果如下。

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-------------------+------+-----+
|start              |end                |action|count|
+-------------------+-------------------+------+-----+
|2018-03-02 10:20:00|2018-03-02 10:30:00|open  |2    |
+-------------------+-------------------+------+-----+

注意到窗口10:20:00-10:30:00的count现在被更新为2,窗口10:10:00- 10:20:00没有变化。如前所述,因为file2.json文件中的最后一行的时间戳落在10分钟的水印阈值之外,因此它不会被处理。

4)如果删除对Watermark API的调用,那么输出结果如下所示。

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-------------------+------+-----+
|start              |end                |action|count|
+-------------------+-------------------+------+-----+
|2018-03-02 10:10:00|2018-03-02 10:20:00|open  |2    |
|2018-03-02 10:20:00|2018-03-02 10:30:00|open  |2    |
+-------------------+-------------------+------+-----+

可以看出,因为没有指定水印,所以迟到的数据也不会被删除,所以对窗口10:10:00- 10:20:00的count计数被更新为2。


《PySpark原理深入与编程实战》