Spark结构化流编程模型

Spark结构化流提供了快速、可扩展、容错、端到端的精确一次性流处理,而用户无需对流进行推理。结构化流操作直接工作在DataFrame(或DataSets)上。不再有“流”的概念,只有流式DataFrames和普通DataFrames。流式DataFrames是作为append-only表实现的。在流数据上的查询返回新的DataFrame,使用它们就像在批处理程序中一样。

使用Spark结构化流的模型如下图所示:

结构化流的关键思想是将实时数据流视为一个不断追加的表。这就产生了一个新的流处理模型,它与批处理模型非常相似。当一组新的数据到达时,将这些新到达的数据作为一组新的行添加到输入表。我们将把流计算表示为标准的批处理查询,就像在静态表上的查询一样,Spark将其作为无边界输入表上的增量查询运行。

从根本上说,结构化流由Spark SQL的Catalyst优化器负责优化。因此,它使开发人员不再担心底层的管道,在处理静态或实时数据流时,使查询更高效。

结构化流编程模型

假设我们想维护从监听TCP套接字的数据服务器接收到的文本数据的运行时单词计数。让我们看看如何使用结构化流来表达这一需求。

【例】实现Spark运行时单词计数流程序。

首先需要使用Netcat(在大多数类unix系统中可以找到的小型实用程序)作为数据服务器。在Linux的终端中,执行如下命令,启动Netcat服务器,使其保持运行。

$ nc -lk 9999

注:如果没有Netcat服务器的话,可以使用如下命令先安装:

$ sudo yum install -y nc

然后,编写我们的结构化流程序并执行。请按以下步骤操作。

1)先导入必要的类并创建本地SparkSession。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()
  
// 用于从RDD到DataFrame的隐式转换
import spark.implicits._

2)接下来,创建一个流DataFrame,它表示从所监听的localhost:9999服务器接收到的文本数据,并对该DataFrame进行转换以进行单词计数。

// 创建表示从连接到localhost:9999的输入行流的DataFrame
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// 这个line DataFrame表示一个包含流文本数据的无界表
// 将行拆分为单词
val words = lines.as[String].flatMap(_.split(" "))

// 生成运行时单词计数
// wordCounts是一个流DataFrame,它表示流的运行时单词计数
val wordCounts = words.groupBy("value").count()

3)在流数据上设置查询。

为此,我们进行设置,以便每次更新计数时都将完整的计数集(由outputMode("complete")指定)打印到控制台。然后使用start()启动流计算。

// 开始运行将运行时单词计数打印到控制台的查询
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

执行此代码后,流计算将在后台开始。其中query对象是该活动流查询的句柄,使用awaitTermination()等待查询终止,以防止查询处于活动状态时进程退出。对于生产和长期运行的流应用程序,有必要调用StreamingQuery.awaitTermination()函数,这是一个阻塞调用,它会防止Driver驱动程序退出,并允许流查询持续运行和当新数据到达数据源时处理新数据。

4) 切换到Netcat窗口,输入几行任意的内容,单词之间用空格分割。例如:

good good study
study day day up

5)切换回流程序执行窗口,查看输出结果。会看到类似如下这样的输出结果:

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|study|    1|
| good|    2|
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  day|    2|
|study|    2|
|   up|    1|
| good|    2|
+-----+-----+

《Spark原理深入与编程实战》