PySpark结构化流核心概念

PySpark结构化流(Structured Streaming)应用程序包括以下主要部分:

  • 指定一个或多个流数据源。
  • 提供了以DataFrame转换的形式操纵传入数据流的逻辑。
  • 定义输出模式和触发器(都有默认值,所以是可选的)。

最后指定一个将结果写出到的数据接收器(data sink)。

下面的部分将详细描述这些概念。

数据源(Data Sources)

对于批处理,数据源是驻留在某些存储系统上的静态数据集,如本地文件系统、HDFS或S3。结构化流的数据源是完全不同的。他们生产的数据是连续的,可能永远不会结束,而且生产速率也会随着时间而变化。

Spark结构化流提供了以下开箱即用的数据源:

  • Kafka源:要求Apache Kafka的版本是0.10或更高版本。这是生产环境中最流行的数据源。连接和读取来自Kafka主题的数据需要提供一组特定的设置。
  • 文件源:文件位于本地文件系统、HDFS或S3上。当新的文件被放入一个目录中时,这个数据源将会把它们挑选出来进行处理。支持常用的文件格式,如文本、CSV、JSON、ORC和Parquet。在处理这个数据源时,一个好的实践是先完全地写出输入文件,然后再将它们移动到这个数据源的路径中。(例如,流程序监控的是HDFS上的A目录,那么先将输入文件写出到HDFS的B目录中,再从B目录将它们移动到A目录)
  • Socket源:这仅用于测试目的。它从一个监听特定的主机和端口的socket上读取UTF-8数据。
  • Rate源:这仅用于测试和基准测试。这个源可以被配置为每秒产生许多事件,其中每个事件由时间戳和一个单调递增的值组成。这是学习结构化流时使用的最简单的源。

数据源需要提供的一个重要的属性是一种跟踪流中的读位置的方法,用于结构化的流来传递端到端、精确一次性保证。例如,Kafka的数据源提供了一个Kafka的偏移量来跟踪一个主题分区的读位置。这个属性决定一个特定的数据源是否具有容错能力。

下表描述了每个开箱即用数据源的一些选项。

数据源 是否容错 配置
File path:输入目录的路径
maxFilesPerTrigger:每个触发器读取新行的最大数量
latestFirst:是否处理最新的文件(根据modification time)
Socket 要求有如下的参数:
- host:要连接到的主机
- port:要连接到的端口号
Rate rowsPerSecond:每秒钟生成的行的数量
rampUpTime:在到达rowsPerSecond之前的时间,以秒为单位
numPartitions:分区的数量
Kafka kafka.bootstrap.servers:Kafka brokers列表,以逗号分隔的host:port
subscribe:主题列表,以逗号分隔

输出模式

输出模式是一种方法,可以告诉结构流如何将输出数据写入到sink中。这个概念对于Spark中的流处理来说是独一无二的。输出模式有三个选项。

  • append模式:如果没有指定输出模式,这是默认模式。在这种模式下,只有追加到结果表的新行才会被发送到指定的输出接收器。只有自上次触发后在结果表中附加的新行将被写入外部存储器。这仅适用于结果表中的现有行不会更改的查询。
  • complete模式:此模式将数据完全从内存写入接收器,即整个结果表将被写到输出接收器。当对流数据执行聚合查询时,就需要这种模式。
  • update模式:只有自上次触发后在结果表中更新的行才会被写到输出接收器中。对于那些没有改变的行,它们将不会被写出来。注意,这与complete模式不同,因为此模式不输出未更改的行。

触发器类型

触发器是另一个需要理解的重要概念。结构化流引擎使用触发器信息来确定何时在流应用程序中运行提供的流计算逻辑。下表描述了不同的触发类型。

类型 描述
未指定(默认) 对于默认类型,Spark将使用微批模型,并且当前一批数据完成处理后,立即处理下一批数据。
固定周期 对于这个类型,Spark将使用微批模型,并基于用户提供的周期处理这批数据。如果因为任何原因导致上一批数据的处理超过了该周期,那么前一批数据完成处理后,立即处理下一批数据。换句话说,Spark将不会等到下一个周期区间边界。
一次性 这个触发器类型意味着用于一次性处理可用的批数据,并且一旦该处理完成的话,Spark将立即停止流程序。当数据量特别低时,这个触发器很有用,因此,构建一个集群并每天处理几次数据更划算。
持续 这个触发器类型调用新的持续处理模型,该模型是设计用于非常低延迟需求的特定流应用程序的。这是Spark 2.3中新的实验性处理模式。这个时候将支持“最少一次性”保证。

数据接收器(Data Sinks)

数据接收器是用来存储流应用程序的输出的。不同的sinks可以支持不同的输出模式,并且具有不同的容错能力,了解这一点很重要。

Spark结构化流支持以下几种数据接收器:

  • Kafka sink:要求Apache Kafka的版本是0.10或更高版本。有一组特定的设置可以连接到Kafka集群。
  • File sink:这是文件系统、HDFS或S3的目的地。支持常用的文件格式,如文本、CSV、JSON、ORC、Parquet。
  • Foreach sink:这是为了在输出中的行上运行任意计算。
  • Console sink:这仅用于测试和调试目的,以及在处理低容量数据时。每个触发器上输出被打印到控制台。
  • Memory sink:这是在处理低容量数据时进行测试和调试的目的。它使用驱动程序的内存来存储输出。

下表列出了每个sink的各种选项:

名称 支持的输出模式 是否容错 配置
File Append path:这是输入目录的路径。支持所有流行的文件格式。详细信息可查看DataFrameWriter API。
Foreach Append
Update
Complete
依情况而定 这是一个非常灵活的接收器,它是特定于实现的。
Console Append
Update
Complete
numRows:这是每个触发器输出的行的数量。默认是20行。
truncate:如果每一行太长的话,是否截断。默认是true。
Memory Append
Complete
N/A
Kafka Append
Update
Complete
kafka.bootstrap.servers:Kafka brokers列表,以逗号分隔的host:port
topic:这是写入数据的Kafka主题

数据接收器必须支持的一个重要的属性(用于结构化的流交付端到端、精确一次性保证)是处理重做的幂等性。换句话说,它必须能够处理使用相同数据的多个写(在不同的时间发生),结果就像只有一个写一样。多重写是在故障场景中重新处理数据的结果。

水印(Watermarking)

数字水印是流处理引擎中常用的一种技术,用于处理迟到的数据。流应用程序开发人员可以指定一个阈值,让结构化的流引擎知道数据在事件时间(event time)内的预期延迟时间。有了这个信息,超过这个预期延迟时间到达的迟到数据会被丢弃。更重要的是,结构化流使用指定的阈值来确定何时可以丢弃旧状态。没有这些信息,结构化流将需要无限期地维护所有状态,这将导致流应用程序的内存溢出问题。任何执行某种聚合或连接的生产环境下的结构化流应用程序都需要指定水印。这是一个重要的概念,关于这个主题的更多细节将在后面的部分中讨论和说明。


《Spark原理深入与编程实战》