PySpark结构化流编程模型
PySpark结构化流提供了快速、可扩展、容错、端到端的精确一次性流处理,而用户无需对流进行推理。结构化流操作直接工作在DataFrame上。不再有“流”的概念,只有流式DataFrames和普通DataFrames。流式DataFrames是作为append-only表实现的。在流数据上的查询返回新的DataFrame,使用它们就像在批处理程序中一样。
使用PySpark结构化流的模型如下图所示:
结构化流的关键思想是将实时数据流视为一个不断追加的表。这就产生了一个新的流处理模型,它与批处理模型非常相似。当一组新的数据到达时,将这些新到达的数据作为一组新的行添加到输入表。我们将把流计算表示为标准的批处理查询,就像在静态表上的查询一样,PySpark将其作为无边界输入表上的增量查询运行。
从根本上说,结构化流由PySpark SQL的Catalyst优化器负责优化。因此,它使开发人员不再担心底层的管道,在处理静态或实时数据流时,使查询更高效。
结构化流编程模型
假设我们想维护从监听TCP套接字的数据服务器接收到的文本数据的运行时单词计数。让我们看看如何使用结构化流来表达这一需求。
【例】实现Spark运行时单词计数流程序。
首先需要使用Netcat(在大多数类unix系统中可以找到的小型实用程序)作为数据服务器。在Linux的终端中,执行如下命令,启动Netcat服务器,使其保持运行。
$ nc -lk 9999
注:如果没有Netcat服务器的话,可以使用如下命令先安装:
$ sudo yum install -y nc
然后,编写我们的结构化流程序并执行。请按以下步骤操作。
1)先导入必要的类并创建本地SparkSession,这是所有与PySpark相关的功能的起点,代码如下。
from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split # 构造一个SparkSession实例 spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate()
2)接下来,创建一个流DataFrame,它表示从所监听的localhost:9999服务器接收到的文本数据,并对该DataFrame进行转换以进行单词计数,代码如下。
# 创建表示从连接到localhost:9999的输入行流的DataFrame lines = spark.readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() # 将行拆分为单词 words = lines.select( explode( split(lines.value, " ") ).alias("word") ) # 生成运行时单词计数 wordCounts = words.groupBy("word").count()
上面的lines这个DataFrame表示一个包含流文本数据的无界表,这个表包含一列名为value的字符串,流文本数据中的每一行都成为表中的一行。接下来,使用了两个内置的SQL函数—split()和explode(),将每行内容分割为多个行,分隔后每行包含一个单词。最后,定义了名为wordCounts的DataFrame,方法是根据数据集中唯一的值进行分组并对它们进行计数。注意,这是一个流DataFrame,它表示流的运行运行时单词数。
3)在流数据上设置查询。
现在已经在流数据上设置了查询,剩下的就是实际开始接收数据并计算计数。为此,我们进行设置,以便每次更新计数时都将完整的计数集(由outputMode("complete")指定)打印到控制台。然后使用start()启动流计算,代码如下。
# 启动流计算,将运行时单词计数打印到控制台的查询 query = wordCounts.writeStream \ .outputMode("complete") \ .format("console") \ .start() query.awaitTermination()
执行此代码后,流计算将在后台开始。其中query对象是该活动流查询的句柄,使用awaitTermination()等待查询终止,以防止查询处于活动状态时进程退出。对于生产和长期运行的流应用程序,有必要调用StreamingQuery.awaitTermination()函数,这是一个阻塞调用,它会防止Driver驱动程序退出,并允许流查询持续运行和当新数据到达数据源时处理新数据。
完整的程序代码如下:
import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split if __name__ == "__main__": if len(sys.argv) != 3: print("用法: structed_wordcount.ipynb", file=sys.stderr) sys.exit(-1) spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate() # 创建表示从连接到localhost:9999的输入行流的DataFrame lines = spark.readStream \ .format("socket") \ .option("host", sys.argv[1]) \ .option("port", sys.argv[2]) \ .load() # 将行拆分为单词 words = lines.select( explode( split(lines.value, " ") ).alias("word") ) # 生成运行时单词计数 wordCounts = words.groupBy("word").count() # 启动流计算,将运行时单词计数打印到控制台的查询 query = wordCounts.writeStream \ .outputMode("complete") \ .format("console") \ .start() query.awaitTermination()
4) 切换到Netcat窗口,输入几行任意的内容,单词之间用空格分割。例如:
good good study study day day up
5)切换回流程序执行窗口,查看输出结果。会看到类似如下这样的输出结果:
------------------------------------------- Batch: 1 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ |study| 1| | good| 2| +-----+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | day| 2| |study| 2| | up| 1| | good| 2| +-----+-----+
有时用户希望停止流查询来改变输出模式、触发器或其他配置。可以使用StreamingQuery.stop()函数来阻止数据源接收新数据,并停止在流查询中逻辑的连续执行,代码如下:
# 这是阻塞调用 mobileSQ.awaitTermination() # 停止流查询 mobileSQ.stop()