使用数据接收器Data Sink

流应用程序的最后一步通常是将计算结果写入一些外部系统或存储系统。结构化流提供了5个内置数据接收器(Data Sink)。其中三个是用于生产的,两个用于测试目的。下面的部分将详细介绍每个Data Sink。

1. 使用File Data Sink

File data sink是一个非常简单的数据接收器,需要提供的唯一必需选项是输出目录。由于File data sink是容错的,结构化的流将需要一个检查点位置来写进度信息和其他元数据,以帮助在出现故障时进行恢复。

【示例】配置Rate数据源,每秒产生10行数据,将生成的数据行发送到两个分区,并将数据以JSON格式写出到指定的目录。

实现的代码如下。

# 导入依赖包
from pyspark.sql import SparkSession

# 创建SparkSession实例
spark = SparkSession \
        .builder \
        .appName("streaming demo") \
        .getOrCreate()

# 设置shuffle后的分区数为10(测试环境下)
spark.conf.set("spark.sql.shuffle.partitions",10)

# 设置日志级别
# spark.sparkContext.setLogLevel("WARN") 

# 将数据从Rate数据源写出到File Sink
rateSourceDF = spark.readStream \
      .format("rate") \
      .option("rowsPerSecond","10") \
      .option("numPartitions","2") \
      .load()

# 将流DataFrame写出到指定的目录,并指定checkpoint
# "path"选项:设置输出目录;"checkpointLocation"选项:设置检查点位置
query = rateSourceDF.writeStream \
      .outputMode("append") \
      .format("json") \
      .option("path", "/tmp/output") \
      .option("checkpointLocation", "/tmp/ck") \
      .start()

# 等待流程序执行结束(作为作业文件提交时启用)
# query.awaitTermination()

由于分区的数量被配置为两个分区,所以每当结构化流在每个触发点上写出数据时,就会将两个文件写到输出目录中。因此,如果检查输出目录的话,将会看到带有名称的文件,这些名称以part-00000或part-00001开头。Rate数据源配置为每秒10行,并且有两个分区;因此,每个输出包含5行,内容如下所示。

part-00000-*.json:

{"timestamp":"2021-02-03T17:56:46.283+08:00","value":0}
{"timestamp":"2021-02-03T17:56:46.483+08:00","value":2}
{"timestamp":"2021-02-03T17:56:46.683+08:00","value":4}
{"timestamp":"2021-02-03T17:56:46.883+08:00","value":6}
{"timestamp":"2021-02-03T17:56:47.083+08:00","value":8}
{"timestamp":"2021-02-03T17:56:47.283+08:00","value":10}
{"timestamp":"2021-02-03T17:56:47.483+08:00","value":12}
{"timestamp":"2021-02-03T17:56:47.683+08:00","value":14}
{"timestamp":"2021-02-03T17:56:47.883+08:00","value":16}
{"timestamp":"2021-02-03T17:56:48.083+08:00","value":18}

part-00001-*.json:

{"timestamp":"2021-02-03T17:56:46.383+08:00","value":1}
{"timestamp":"2021-02-03T17:56:46.583+08:00","value":3}
{"timestamp":"2021-02-03T17:56:46.783+08:00","value":5}
{"timestamp":"2021-02-03T17:56:46.983+08:00","value":7}
{"timestamp":"2021-02-03T17:56:47.183+08:00","value":9}
{"timestamp":"2021-02-03T17:56:47.383+08:00","value":11}
{"timestamp":"2021-02-03T17:56:47.583+08:00","value":13}
{"timestamp":"2021-02-03T17:56:47.783+08:00","value":15}
{"timestamp":"2021-02-03T17:56:47.983+08:00","value":17}
{"timestamp":"2021-02-03T17:56:48.183+08:00","value":19}

2. 使用Kafka Data Sink

在结构化的流中,将流DataFrame的数据写入Kafka的data sink,要比从Kafka的数据源中读取数据要简单得多。

下面我们编写一个PySpark结构化流应用程序,它读取Rate数据源,将数据写到Kafka的指定主题中。

【示例】编写PySpark结构化流应用程序作为Kafka的生产者,将从Rate数据源读取的消息写入到Kafka的“rates”主题中。

在这个示例中,PySpark结构化流程序会向Kafka的“rates”主题发送消息(本例为读取自Rate数据源的数据),我们用Kafka自带的消费者脚本程序订阅该主题。一旦它收到了订阅的消息,马上输出。程序处理流程如下图所示:

首先,我们编写PySpark结构化流程序代码。实现如下:

# 导入依赖包
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 创建SparkSession实例
spark = SparkSession \
        .builder \
        .appName("kafka data sink demo") \
        .getOrCreate()

# 设置shuffle后的分区数为10(测试环境下)
spark.conf.set("spark.sql.shuffle.partitions",10)

# 设置日志级别
# spark.sparkContext.setLogLevel("WARN") 

# 以每秒10行的速度设置Rate数据源,并使用两个分区
ratesSinkDF = spark.readStream \
      .format("rate") \
      .option("rowsPerSecond","10") \
      .option("numPartitions","2") \
      .load()

# 转换ratesSinkDF以创建一个"key"列和"value"列
# value列包含一个JSON字符串,该字符串包含两个字段:timestamp 和value
ratesSinkForKafkaDF = ratesSinkDF.select(
        col("value").cast("string").alias("key"),
        to_json(struct("timestamp","value")).alias("value")
)

# 设置一个流查询,使用topic "rates",将数据写到Kafka
query = ratesSinkForKafkaDF.writeStream \
      .outputMode("append") \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "xueai8:9092") \
      .option("topic","rates") \
      .option("checkpointLocation", "/tmp/rates") \
      .start()

# 等待流程序执行结束(作为作业文件提交时启用)
# query.awaitTermination()         # pyspark shell交互环境下,不需要这句

如果一切正常,应该可以看到在终端输出收到的订阅消息,如下(部分):

{"timestamp":"2021-02-03T19:14:59.352+08:00","value":1960}
{"timestamp":"2021-02-03T19:14:59.552+08:00","value":1962}
{"timestamp":"2021-02-03T19:14:59.752+08:00","value":1964}
{"timestamp":"2021-02-03T19:14:59.952+08:00","value":1966}
{"timestamp":"2021-02-03T19:15:00.152+08:00","value":1968}
{"timestamp":"2021-02-03T19:14:59.452+08:00","value":1961}
{"timestamp":"2021-02-03T19:14:59.652+08:00","value":1963}
{"timestamp":"2021-02-03T19:14:59.852+08:00","value":1965}
......

3. 使用Foreach Data Sink

与结构化流提供的其他内置data sinks相比,foreach data sink是一个很有意思的数据接收器,因为它根据数据应该如何被写出、何时写出数据以及将数据写入何处,提供了完整的灵活性,但这种灵活性和可扩展性是有要求的,即由我们自己来负责在使用这个数据接收器时写出数据的逻辑。

foreach和foreachBatch操作允许用户对流查询的输出应用任意操作和编写逻辑,但它们的应用场景略有不同—foreach允许在每一行上自定义写逻辑,而foreachBatch允许对每个微批处理的输出进行任意操作和自定义逻辑。

【示例】编写PySpark结构化流应用程序,通过将Rate数据源中的数据写入控制台,包含了一个ForeachWriter抽象类的简单实现。

首先定义一个ForeachWriter类的实现,并实现它的三个方法:open()、process()和close(),代码如下:

# 自定义一个ForeachWriter
class ForeachConsoleWriter:
    def __init__(self):
        self.p_id = 0
        self.e_id = 0
    
    def open(self, partition_id, epoch_id):
        self.p_id = partition_id
        self.e_id = epoch_id
        print(f"open => ({partition_id},{epoch_id})")
        return True
    
    def process(self, row):
        print(f"writing => {row}")
    
    def close(self, error):
        print(f"close => ({self.p_id}, {self.e_id})")

然后,在流数据写出时,指定forach()方法使用上面自定义的ForeachConsoleWriter。代码如下:

# 导入依赖包
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 创建SparkSession实例
spark = SparkSession \
        .builder \
        .appName("foreach data sink demo") \
        .getOrCreate()

# 设置shuffle后的分区数为2(测试环境下)
spark.conf.set("spark.sql.shuffle.partitions",2)

# 设置日志级别
# spark.sparkContext.setLogLevel("WARN") 

# 以每秒10行的速度设置Rate数据源,并使用两个分区
ratesSourceDF = spark.readStream \
      .format("rate") \
      .option("rowsPerSecond","10") \
      .option("numPartitions","2") \
      .load()

# 设置Foreach data sink
query = ratesSourceDF \
	.writeStream \
	.foreach(ForeachConsoleWriter()) \
	.outputMode("append") \
	.start()

# 等待流程序执行结束(作为作业文件提交时启用)
# query.awaitTermination()

当开始执行时,可以看到控制台的输出,类似下面这样:

open => (0, 1)
writing => [2021-02-03 19:52:53.194,0]
writing => [2021-02-03 19:52:53.394,2]
writing => [2021-02-03 19:52:53.594,4]
writing => [2021-02-03 19:52:53.794,6]
writing => [2021-02-03 19:52:53.994,8]
close => (0, 1)
open => (1, 1)
writing => [2021-02-03 19:52:53.294,1]
writing => [2021-02-03 19:52:53.494,3]
writing => [2021-02-03 19:52:53.694,5]
writing => [2021-02-03 19:52:53.894,7]
writing => [2021-02-03 19:52:54.094,9]
close => (1, 1)
open => (0, 2)
writing => [2021-02-03 19:52:54.194,10]
writing => [2021-02-03 19:52:54.394,12]
writing => [2021-02-03 19:52:54.594,14]
writing => [2021-02-03 19:52:54.794,16]
writing => [2021-02-03 19:52:54.994,18]
close => (0, 2)
open => (1, 2)
writing => [2021-02-03 19:52:54.294,11]
writing => [2021-02-03 19:52:54.494,13]
writing => [2021-02-03 19:52:54.694,15]
writing => [2021-02-03 19:52:54.894,17]
writing => [2021-02-03 19:52:55.094,19]
close => (1, 2)
...

4. 使用Console Data Sink

这个数据接收器非常简单,但它不是一个容错的data sink。它主要用于学习和测试,不能在生产环境下使用。它只有两种选项配置:要显示的行数,以及输出太长时是否截断。这些选项都有一个默认值,如下表所示。这个数据接收器的底层实现使用与DataFrame.show方法相同的逻辑来显示流DataFrame中的数据。

Option 默认值 描述
numRows 20 在控制台输出的行的数量
truncate true 当每一行的内容超过20个字符时,是否截断显示

下面通过一个示例来了解这些option参数的用法。

【示例】编写PySpark结构化流程序,读取rate数据源流数据,并将流数据处理结果写出到控制台,每次输出不超过3行。

代码实现如下。

# 导入依赖包
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 创建SparkSession实例
spark = SparkSession \
        .builder \
        .appName("console data sink demo") \
        .getOrCreate()

# 设置shuffle后的分区数为2(测试环境下)
spark.conf.set("spark.sql.shuffle.partitions",2)

# 设置日志级别
# spark.sparkContext.setLogLevel("WARN") 

# 以每秒10行的速度设置Rate数据源,并使用两个分区
ratesSourceDF = spark.readStream \
      .format("rate") \
      .option("rowsPerSecond","10") \
      .option("numPartitions","2") \
      .load()

# 将结果DataFrame写出到控制台
query = ratesSourceDF.writeStream \
      .outputMode("append") \
      .format("console") \
      .option("truncate",value = False) \
      .option("numRows",3) \
      .start()

# 等待流程序执行结束(作为作业文件提交时启用)
# query.awaitTermination()		# spark-shell交互环境下不需要

执行上面的程序,输出结果如下:

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2021-02-03 20:07:36.987|0    |
|2021-02-03 20:07:37.187|2    |
|2021-02-03 20:07:37.387|4    |
|2021-02-03 20:07:37.587|6    |
|2021-02-03 20:07:37.787|8    |
|2021-02-03 20:07:37.087|1    |
|2021-02-03 20:07:37.287|3    |
|2021-02-03 20:07:37.487|5    |
|2021-02-03 20:07:37.687|7    |
|2021-02-03 20:07:37.887|9    |
+-----------------------+-----+
...

5. 使用Memory Data Sink

与Console data sink类似,这个数据接收器也很容易理解和使用。事实上,它非常简单,不需要任何配置。它也不是一个容错的data sink,主要用于学习和测试,不在生产环境中使用。它收集的数据被发送给Driver,并作为内存中的表存储在Driver中。换句话说,可以发送到Memory data sink的数据量是由Driver中JVM拥有的内存大小决定的。

下面通过一个示例来了解Memory数据接收器的用法。

【示例】编写PySpark结构化流程序,读取rate数据源流数据,并流数据处理结果写出到内存表中,然后对该内存表发出。

代码实现如下。

# 导入依赖包
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 创建SparkSession实例
spark = SparkSession \
        .builder \
        .appName("console data sink demo") \
        .getOrCreate()

# 设置shuffle后的分区数为2(测试环境下)
spark.conf.set("spark.sql.shuffle.partitions",2)

# 设置日志级别
# spark.sparkContext.setLogLevel("WARN") 

# 以每秒10行的速度设置Rate数据源,并使用两个分区
ratesSourceDF = spark.readStream \
      .format("rate") \
      .option("rowsPerSecond","10") \
      .option("numPartitions","2") \
      .load()

# 将数据写出到Memory data sink,内存表名为"rates"
query = ratesSourceDF.writeStream \
      .outputMode("append") \
      .format("memory") \
      .queryName("rates_tb") \
      .start()

# 针对“rates”内存表发出SQL查询
spark.sql("select * from rates_tb order by value desc").show(truncate=False)

# 统计“rates”内存表中的行数
spark.sql("select count(*) from rates_tb").show(truncate=False)

# 等待流程序执行结束(作为作业文件提交时启用)
# query.awaitTermination()	# 在pyspark shell交互环境下,不需要这一句

需要注意的一点是,即使在流查询ratesSQ停止之后,内存中的rates仍然会存在。然而,一旦一个新的流查询以相同的名称开始,那么来自内存中的数据就会被截断。


《Spark原理深入与编程实战》