PySpark结构化流编程模型

PySpark结构化流提供了快速、可扩展、容错、端到端的精确一次性流处理,而用户无需对流进行推理。结构化流操作直接工作在DataFrame上。不再有“流”的概念,只有流式DataFrames和普通DataFrames。流式DataFrames是作为append-only表实现的。在流数据上的查询返回新的DataFrame,使用它们就像在批处理程序中一样。

使用PySpark结构化流的模型如下图所示:

结构化流的关键思想是将实时数据流视为一个不断追加的表。这就产生了一个新的流处理模型,它与批处理模型非常相似。当一组新的数据到达时,将这些新到达的数据作为一组新的行添加到输入表。我们将把流计算表示为标准的批处理查询,就像在静态表上的查询一样,PySpark将其作为无边界输入表上的增量查询运行。

从根本上说,结构化流由PySpark SQL的Catalyst优化器负责优化。因此,它使开发人员不再担心底层的管道,在处理静态或实时数据流时,使查询更高效。

结构化流编程模型

假设我们想维护从监听TCP套接字的数据服务器接收到的文本数据的运行时单词计数。让我们看看如何使用结构化流来表达这一需求。

【例】实现Spark运行时单词计数流程序。

首先需要使用Netcat(在大多数类unix系统中可以找到的小型实用程序)作为数据服务器。在Linux的终端中,执行如下命令,启动Netcat服务器,使其保持运行。

$ nc -lk 9999

注:如果没有Netcat服务器的话,可以使用如下命令先安装:

$ sudo yum install -y nc

然后,编写我们的结构化流程序并执行。请按以下步骤操作。

1)先导入必要的类并创建本地SparkSession,这是所有与PySpark相关的功能的起点,代码如下。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

# 构造一个SparkSession实例
spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

2)接下来,创建一个流DataFrame,它表示从所监听的localhost:9999服务器接收到的文本数据,并对该DataFrame进行转换以进行单词计数,代码如下。

# 创建表示从连接到localhost:9999的输入行流的DataFrame
lines = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# 将行拆分为单词
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# 生成运行时单词计数
wordCounts = words.groupBy("word").count()

上面的lines这个DataFrame表示一个包含流文本数据的无界表,这个表包含一列名为value的字符串,流文本数据中的每一行都成为表中的一行。接下来,使用了两个内置的SQL函数—split()和explode(),将每行内容分割为多个行,分隔后每行包含一个单词。最后,定义了名为wordCounts的DataFrame,方法是根据数据集中唯一的值进行分组并对它们进行计数。注意,这是一个流DataFrame,它表示流的运行运行时单词数。

3)在流数据上设置查询。

现在已经在流数据上设置了查询,剩下的就是实际开始接收数据并计算计数。为此,我们进行设置,以便每次更新计数时都将完整的计数集(由outputMode("complete")指定)打印到控制台。然后使用start()启动流计算,代码如下。

# 启动流计算,将运行时单词计数打印到控制台的查询
query = wordCounts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

执行此代码后,流计算将在后台开始。其中query对象是该活动流查询的句柄,使用awaitTermination()等待查询终止,以防止查询处于活动状态时进程退出。对于生产和长期运行的流应用程序,有必要调用StreamingQuery.awaitTermination()函数,这是一个阻塞调用,它会防止Driver驱动程序退出,并允许流查询持续运行和当新数据到达数据源时处理新数据。

完整的程序代码如下:

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("用法: structed_wordcount.ipynb  ", file=sys.stderr)
        sys.exit(-1)

    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCount") \
        .getOrCreate()

    # 创建表示从连接到localhost:9999的输入行流的DataFrame
    lines = spark.readStream \
        .format("socket") \
        .option("host", sys.argv[1]) \
        .option("port", sys.argv[2]) \
        .load()

    # 将行拆分为单词
    words = lines.select(
        explode(
            split(lines.value, " ")
        ).alias("word")
    )

    # 生成运行时单词计数
    wordCounts = words.groupBy("word").count()

    # 启动流计算,将运行时单词计数打印到控制台的查询
    query = wordCounts.writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()

    query.awaitTermination()

4) 切换到Netcat窗口,输入几行任意的内容,单词之间用空格分割。例如:

good good study
study day day up

5)切换回流程序执行窗口,查看输出结果。会看到类似如下这样的输出结果:

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|study|    1|
| good|    2|
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  day|    2|
|study|    2|
|   up|    1|
| good|    2|
+-----+-----+

有时用户希望停止流查询来改变输出模式、触发器或其他配置。可以使用StreamingQuery.stop()函数来阻止数据源接收新数据,并停止在流查询中逻辑的连续执行,代码如下:

# 这是阻塞调用
mobileSQ.awaitTermination()

# 停止流查询
mobileSQ.stop()

《Flink原理深入与编程实战》