使用数据接收器Data Sink

流应用程序的最后一步通常是将计算结果写入一些外部系统或存储系统。结构化流提供了5个内置数据接收器(Data Sink)。其中三个是用于生产的,两个用于测试目的。下面的部分将详细介绍每个Data Sink。

使用File Data Sink

File data sink是一个非常简单的数据接收器,需要提供的唯一必需选项是输出目录。由于File data sink是容错的,结构化的流将需要一个检查点位置来写进度信息和其他元数据,以帮助在出现故障时进行恢复。

【示例】配置Rate数据源,每秒产生10行数据,将生成的数据行发送到两个分区,并将数据以JSON格式写出到指定的目录。

实现的代码如下。

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local")
      .appName("file sink")
      .getOrCreate()
    
    // spark.sparkContext.setLogLevel("WARN")      	// 设置日志级别

    // 将数据从Rate数据源写出到File Sink
    val rateSourceDF = spark.readStream
      .format("rate")
      .option("rowsPerSecond","10")               	// 每秒产生10条数据
      .option("numPartitions","2")                	// 两个分区
      .load()

    val query = rateSourceDF.writeStream
      .outputMode("append")
      .format("json")                             	// or "csv"
      .option("path", "tmp/output")               	// 设置输出目录
      .option("checkpointLocation", "tmp/ck")     	// 设置checkpoint
      .start()

    // 等待流程序结束
    query.awaitTermination()
  }

由于分区的数量被配置为两个分区,所以每当结构化流在每个触发点上写出数据时,就会将两个文件写到输出目录中。因此,如果检查输出目录的话,将会看到带有名称的文件,这些名称以part-00000或part-00001开头。Rate数据源配置为每秒10行,并且有两个分区;因此,每个输出包含5行,内容如下所示。

part-00000-*.json:

{"timestamp":"2021-02-03T17:56:46.283+08:00","value":0}
{"timestamp":"2021-02-03T17:56:46.483+08:00","value":2}
{"timestamp":"2021-02-03T17:56:46.683+08:00","value":4}
{"timestamp":"2021-02-03T17:56:46.883+08:00","value":6}
{"timestamp":"2021-02-03T17:56:47.083+08:00","value":8}
{"timestamp":"2021-02-03T17:56:47.283+08:00","value":10}
{"timestamp":"2021-02-03T17:56:47.483+08:00","value":12}
{"timestamp":"2021-02-03T17:56:47.683+08:00","value":14}
{"timestamp":"2021-02-03T17:56:47.883+08:00","value":16}
{"timestamp":"2021-02-03T17:56:48.083+08:00","value":18}

part-00001-*.json:

{"timestamp":"2021-02-03T17:56:46.383+08:00","value":1}
{"timestamp":"2021-02-03T17:56:46.583+08:00","value":3}
{"timestamp":"2021-02-03T17:56:46.783+08:00","value":5}
{"timestamp":"2021-02-03T17:56:46.983+08:00","value":7}
{"timestamp":"2021-02-03T17:56:47.183+08:00","value":9}
{"timestamp":"2021-02-03T17:56:47.383+08:00","value":11}
{"timestamp":"2021-02-03T17:56:47.583+08:00","value":13}
{"timestamp":"2021-02-03T17:56:47.783+08:00","value":15}
{"timestamp":"2021-02-03T17:56:47.983+08:00","value":17}
{"timestamp":"2021-02-03T17:56:48.183+08:00","value":19}

使用Kafka Data Sink

在结构化的流中,将流DataFrame的数据写入Kafka的data sink,要比从Kafka的数据源中读取数据要简单得多。

下面我们编写一个Spark结构化流应用程序,它读取Rate数据源,将数据写到Kafka的指定主题中。

【示例】编写Spark结构化流应用程序作为Kafka的生产者,将从Rate数据源读取的消息写入到Kafka的“rates”主题中。

在这个示例中,Spark结构化流程序会向Kafka的“rates”主题发送消息(本例为读取自Rate数据源的数据),我们用Kafka自带的消费者脚本程序订阅该主题。一旦它收到了订阅的消息,马上输出。程序处理流程如下图所示:

首先,我们编写Spark结构化流程序代码。实现如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
......

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local")
      .appName("kafka sink")
      .getOrCreate()

    // spark.sparkContext.setLogLevel("WARN")      // 设置日志级别

    // 以每秒10行的速度设置Rate数据源,并使用两个分区
    val ratesSinkDF = spark.readStream
      .format("rate")
      .option("rowsPerSecond","10")
      .option("numPartitions","2")
      .load()

    // 转换ratesSinkDF以创建一个"key"列和"value"列
    // value列包含一个JSON字符串,该字符串包含两个字段:timestamp 和value
    val ratesSinkForKafkaDF = ratesSinkDF
      .select(
        col("value").cast("string") as "key",
        to_json(struct("timestamp","value")) as "value"
      )

    // 设置一个流查询,使用topic "rates",将数据写到Kafka
    val query = ratesSinkForKafkaDF.writeStream
      .outputMode("append")
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic","rates")
      .option("checkpointLocation", "tmp/rates")
      .start()

    // 等待流程序结束
    query.awaitTermination()
  }

如果一切正常,应该可以看到在终端输出收到的订阅消息,如下(部分):

{"timestamp":"2021-02-03T19:14:59.352+08:00","value":1960}
{"timestamp":"2021-02-03T19:14:59.552+08:00","value":1962}
{"timestamp":"2021-02-03T19:14:59.752+08:00","value":1964}
{"timestamp":"2021-02-03T19:14:59.952+08:00","value":1966}
{"timestamp":"2021-02-03T19:15:00.152+08:00","value":1968}
{"timestamp":"2021-02-03T19:14:59.452+08:00","value":1961}
{"timestamp":"2021-02-03T19:14:59.652+08:00","value":1963}
{"timestamp":"2021-02-03T19:14:59.852+08:00","value":1965}
......

使用Foreach Data Sink

与结构化流提供的其他内置data sinks相比,foreach data sink是一个很有意思的数据接收器,因为它根据数据应该如何被写出、何时写出数据以及将数据写入何处,提供了完整的灵活性,但这种灵活性和可扩展性是有要求的,即由我们自己来负责在使用这个数据接收器时写出数据的逻辑。

要使用foreach接收器,必须实现ForeachWriter接口(注意,该接口只支持Scala/Java),它包含三个方法:open、process和close。只要有一个触发器的输出生成一系列的行,这些方法就会被调用。

【示例】编写Spark结构化流应用程序,通过将Rate数据源中的数据写入控制台,包含了一个ForeachWriter抽象类的简单实现。

首先导入依赖的类:

import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}

接下来定义一个ForeachWriter抽象类的实现,并实现它的三个方法:open、process和close。

  // 自定义一个ConsoleWriter,继承自ForeachWriter
  class ConsoleWriter(private var pId:Long = 0, private var ver:Long = 0) extends ForeachWriter[Row] {

    // 初始化方法
    def open(partitionId: Long, version: Long): Boolean = {
      pId = partitionId     // 分区id
      ver = version         // 版本号
      println(s"open => ($partitionId, $version)")
      true
    }

    // 业务处理方法
    def process(row: Row): Unit = {
      println(s"writing => $row")
    }

    // 做一些清理性的工作
    def close(errorOrNull: Throwable): Unit = {
      println(s"close => ($pId, $ver)")
    }
  }

然后,在流数据写出时,指定forach方法使用上面自定义的ConsoleWriter。

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local")
      .appName("foreach sink")
      .getOrCreate()

    // spark.sparkContext.setLogLevel("WARN")      // 设置日志级别

    // 以每秒10行的速度设置Rate数据源,并使用两个分区
    val ratesSourceDF = spark.readStream
      .format("rate")
      .option("rowsPerSecond","10")
      .option("numPartitions","2")
      .load()

    // 设置Foreach data sink
    val query = ratesSourceDF.writeStream
      .foreach(new ConsoleWriter)
      .start()

    // 等待流程序结束
    query.awaitTermination()
  }

当开始执行时,可以看到控制台的输出,类似下面这样:

open => (0, 1)
writing => [2021-02-03 19:52:53.194,0]
writing => [2021-02-03 19:52:53.394,2]
writing => [2021-02-03 19:52:53.594,4]
writing => [2021-02-03 19:52:53.794,6]
writing => [2021-02-03 19:52:53.994,8]
close => (0, 1)
open => (1, 1)
writing => [2021-02-03 19:52:53.294,1]
writing => [2021-02-03 19:52:53.494,3]
writing => [2021-02-03 19:52:53.694,5]
writing => [2021-02-03 19:52:53.894,7]
writing => [2021-02-03 19:52:54.094,9]
close => (1, 1)
open => (0, 2)
writing => [2021-02-03 19:52:54.194,10]
writing => [2021-02-03 19:52:54.394,12]
writing => [2021-02-03 19:52:54.594,14]
writing => [2021-02-03 19:52:54.794,16]
writing => [2021-02-03 19:52:54.994,18]
close => (0, 2)
open => (1, 2)
writing => [2021-02-03 19:52:54.294,11]
writing => [2021-02-03 19:52:54.494,13]
writing => [2021-02-03 19:52:54.694,15]
writing => [2021-02-03 19:52:54.894,17]
writing => [2021-02-03 19:52:55.094,19]
close => (1, 2)
......

使用Console Data Sink

这个数据接收器非常简单,但它不是一个容错的data sink。它主要用于学习和测试,不能在生产环境下使用。它只有两种选项配置:要显示的行数,以及输出太长时是否截断。这些选项都有一个默认值,如下表所示。这个数据接收器的底层实现使用与DataFrame.show方法相同的逻辑来显示流DataFrame中的数据。

Option 默认值 描述
numRows 20 在控制台输出的行的数量
truncate true 当每一行的内容超过20个字符时,是否截断显示

下面通过一个示例来了解这些option参数的用法。

【示例】编写Spark结构化流程序,读取rate数据源流数据,并将流数据处理结果写出到控制台,每次输出不超过30行。

代码实现如下。

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local")
      .appName("file sink")
      .getOrCreate()

    // spark.sparkContext.setLogLevel("INFO")      // 设置日志级别

    // Rate数据源读出数据
    val rateSourceDF = spark.readStream
      .format("rate")
      .option("rowsPerSecond","10")               // 每秒产生10条数据
      .option("numPartitions","2")                // 两个分区
      .load()

    // 将结果DataFrame写出到控制台
    val query = rateSourceDF.writeStream
      .outputMode("append")
      .format("console")                         // console data sink
      .option("truncate",value = false)          // 不截断显示
      .option("numRows",30)                      // 每次输出30行
      .start()

    // 等待流程序结束
    query.awaitTermination()
  }

执行上面的程序,输出结果如下:

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2021-02-03 20:07:36.987|0    |
|2021-02-03 20:07:37.187|2    |
|2021-02-03 20:07:37.387|4    |
|2021-02-03 20:07:37.587|6    |
|2021-02-03 20:07:37.787|8    |
|2021-02-03 20:07:37.087|1    |
|2021-02-03 20:07:37.287|3    |
|2021-02-03 20:07:37.487|5    |
|2021-02-03 20:07:37.687|7    |
|2021-02-03 20:07:37.887|9    |
+-----------------------+-----+
......

使用Memory Data Sink

与Console data sink类似,这个数据接收器也很容易理解和使用。事实上,它非常简单,不需要任何配置。它也不是一个容错的data sink,主要用于学习和测试,不在生产环境中使用。它收集的数据被发送给Driver,并作为内存中的表存储在Driver中。换句话说,可以发送到Memory data sink的数据量是由Driver中JVM拥有的内存大小决定的。

下面通过一个示例来了解Memory数据接收器的用法。

【示例】编写Spark结构化流程序,读取rate数据源流数据,并流数据处理结果写出到内存表中,然后对该内存表发出。

代码实现如下。

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local")
      .appName("memory data sink")
      .getOrCreate()

    // spark.sparkContext.setLogLevel("WARN")      // 设置日志级别
    // 读取rate数据源数据
    val ratesDF = spark.readStream
      .format("rate")
      .option("rowsPerSecond","5")
      .option("numPartitions","1")
      .load()

    // 将数据写出到Memory data sink,内存表名为"rates"
    val query = ratesDF.writeStream
      .outputMode("append")
      .format("memory")                 // 指定 memory data sink
      .queryName("rates")               // 指定查询名称
      .option("truncate", value = false)
      .start()

    // 我们针对“rates”内存表发出SQL查询
    spark.sql("select * from rates").show(10)

    // 统计“rates”内存表中的行数
    spark.sql("select count(*) from rates").show()

    query.awaitTermination()
  }

需要注意的一点是,即使在流查询ratesSQ停止之后,内存中的rates仍然会存在。然而,一旦一个新的流查询以相同的名称开始,那么来自内存中的数据就会被截断。


《Spark原理深入与编程实战》