Spark结构化流编程模型
Spark结构化流提供了快速、可扩展、容错、端到端的精确一次性流处理,而用户无需对流进行推理。结构化流操作直接工作在DataFrame(或DataSets)上。不再有“流”的概念,只有流式DataFrames和普通DataFrames。流式DataFrames是作为append-only表实现的。在流数据上的查询返回新的DataFrame,使用它们就像在批处理程序中一样。
使用Spark结构化流的模型如下图所示:
结构化流的关键思想是将实时数据流视为一个不断追加的表。这就产生了一个新的流处理模型,它与批处理模型非常相似。当一组新的数据到达时,将这些新到达的数据作为一组新的行添加到输入表。我们将把流计算表示为标准的批处理查询,就像在静态表上的查询一样,Spark将其作为无边界输入表上的增量查询运行。
从根本上说,结构化流由Spark SQL的Catalyst优化器负责优化。因此,它使开发人员不再担心底层的管道,在处理静态或实时数据流时,使查询更高效。
结构化流编程模型
假设我们想维护从监听TCP套接字的数据服务器接收到的文本数据的运行时单词计数。让我们看看如何使用结构化流来表达这一需求。
【例】实现Spark运行时单词计数流程序。
首先需要使用Netcat(在大多数类unix系统中可以找到的小型实用程序)作为数据服务器。在Linux的终端中,执行如下命令,启动Netcat服务器,使其保持运行。
$ nc -lk 9999
注:如果没有Netcat服务器的话,可以使用如下命令先安装:
$ sudo yum install -y nc
然后,编写我们的结构化流程序并执行。请按以下步骤操作。
1)先导入必要的类并创建本地SparkSession。
import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .getOrCreate() // 用于从RDD到DataFrame的隐式转换 import spark.implicits._
2)接下来,创建一个流DataFrame,它表示从所监听的localhost:9999服务器接收到的文本数据,并对该DataFrame进行转换以进行单词计数。
// 创建表示从连接到localhost:9999的输入行流的DataFrame val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 这个line DataFrame表示一个包含流文本数据的无界表 // 将行拆分为单词 val words = lines.as[String].flatMap(_.split(" ")) // 生成运行时单词计数 // wordCounts是一个流DataFrame,它表示流的运行时单词计数 val wordCounts = words.groupBy("value").count()
3)在流数据上设置查询。
为此,我们进行设置,以便每次更新计数时都将完整的计数集(由outputMode("complete")指定)打印到控制台。然后使用start()启动流计算。
// 开始运行将运行时单词计数打印到控制台的查询 val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination()
执行此代码后,流计算将在后台开始。其中query对象是该活动流查询的句柄,使用awaitTermination()等待查询终止,以防止查询处于活动状态时进程退出。对于生产和长期运行的流应用程序,有必要调用StreamingQuery.awaitTermination()函数,这是一个阻塞调用,它会防止Driver驱动程序退出,并允许流查询持续运行和当新数据到达数据源时处理新数据。
4) 切换到Netcat窗口,输入几行任意的内容,单词之间用空格分割。例如:
good good study study day day up
5)切换回流程序执行窗口,查看输出结果。会看到类似如下这样的输出结果:
------------------------------------------- Batch: 1 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ |study| 1| | good| 2| +-----+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | day| 2| |study| 2| | up| 1| | good| 2| +-----+-----+