使用流数据源Kafka

Kafka通常用于构建实时流数据管道,以可靠地在系统之间移动数据,还用于转换和响应数据流。Kafka作为集群运行在一个或多个服务器上。

Kafka的一些关键概念描述如下:

  • Topic:主题。消息发布到的类别或流名称的高级抽象。主题可以有0、1或多个消费者,这些消费者订阅发布到该主题的消息。用户为每个新的消息类别定义一个新主题;
  • Producers:生产者。向主题发布消息的客户端;
  • Consumers:消费者。使用来自主题的消息的客户端;
  • Brokers:服务器。复制和持久化消息数据的一个或多个服务器。

此外,生产者和消费者可以同时对多个主题进行读写。每个Kafka主题都是分区的,写入每个分区的消息都是顺序的。分区中的消息具有一个偏移量,用来惟一标识分区内的每个消息。

主题的分区是分布式的,每个Broker处理对分区共享的请求。每个分区在Brokers(数量可配置的)之间复制。Kafka集群在一段时间内(可配置的)保留所有已发布的消息。Kafka使用ZooKeeper作为其分布式进程的协调服务。

说明:Kafka的数据源可能是在生产型流应用程序中最常用的数据源。为了有效地处理这个数据源,我们需要一定的Kafka使用基本知识。

在使用Kafka数据源时,我们的流程序实际上充当了Kafka的消费者。因此,流程序所需要的信息与Kafka的消费者所需要的信息相似。下表列出了配置Kafka数据源一些选项:

Option 描述
kafka.bootstrap.servers host1:port1, host2:port2 Kafka服务器列表,逗号分隔。
subscribe topic1, topic2 这个数据源要读取的主题名列表,以逗号分隔。
subscribePattern topic.* 使用正则模式表示要读取数据的主题,比subscribe要灵活。
assign {topic1:[1,2], topic2:[3,4]} 指定要读取数据的主题的分区。这个信息必须是json格式。

其中必需的信息是要连接的Kafka服务器的列表,以及一个或多个从其读取数据的主题。为了支持选择从哪个主题和主题分区来读取数据的各种方法,它支持三种不同的方式来指定这些信息。我们只需要选择最适合自身用例的那个即可。

默认情况下,Kafka的数据源并不是PySpark的内置数据源,因此如果要开发读取Kafka数据的PySpark结构化流处理程序,必须添加Kafka的依赖包到classpath中。

下面我们通过一个示例来演示如何编写PySpark结构化流处理程序来读取Kafka中的数据。

【示例】编写PySpark结构化流程序作为Kafka的消费者程序,Kafka作为流数据源。

在这个示例中,我们使用Kafka自带的生产者脚本向Kafka的test主题发送内容,而PySpark结构化流程序会订阅该主题。一旦它收到了订阅的消息,马上输出到控制台中。程序处理流程如下图所示:

首先,我们编写PySpark结构化流程序代码。实现如下:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 创建SparkSession实例
spark = SparkSession.builder.appName("Kafka Source").getOrCreate()

# 创建一个流来监听test topic的消息
dataDF = spark.readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", "test") \
      .option("startingOffsets", "earliest") \
      .load()

# 查看这个DataFrame的schema
dataDF.printSchema()

# 将该流转换为String数据类型(key和value都是字节数组形式)
# kvstream = dataDF.selectExpr("CAST(key as string)", "CAST(value as string)")
kvstream = dataDF.selectExpr("CAST(value as string)", "topic", "partition", "offset")

# 将该流写出到控制台
query = kvstream.writeStream \
      .outputMode("append") \
      .format("console") \
      .start()

# 等待流程序执行结束(作为作业文件提交时启用)
# query.awaitTermination()

要运行这个流程序,请按以下步骤进行操作。

(1) 启动zookeeper服务。Kafka依赖于Apache ZooKeeper,所以在启动Kafka之前,要先启动它。打开一个终端窗口,执行如下命令:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

等待30秒左右ZooKeeper启动。

(2) 接下来,启动Kafka服务器。另打开一个终端窗口,执行如下命令:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-server-start.sh config/server.properties

等待30秒左右Kafka启动。

(3) 查看和创建Kafka主题(如果已经有了test主题,则此步略过)。另外打开第三个终端窗口,创建test主题,执行如下命令:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看已有的主题,执行如下命令:

$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181

(4) 启动流程序,开始接收从Kafka test主题订阅的消息。

(5) 向Kafka test主题发送消息。另外打开第四个终端窗口,生产消息并发布给test主题,执行如下命令:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

然后,随意键入一些消息。例如,输入如下内容:

> good good study
> day day up

(6) 回到流程序执行窗口,如果一切正常,应该可以看到在控制台输出收到的订阅消息,内容如下:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+---------+------+
|value|topic|partition|offset|
+-----+-----+---------+------+
+-----+-----+---------+------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------------+-----+---------+------+
|          value|topic|partition|offset|
+---------------+-----+---------+------+
|good good study| test|        0|     0|
+---------------+-----+---------+------+

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+-----+---------+------+
|     value|topic|partition|offset|
+----------+-----+---------+------+
|day day up| test|        0|     1|
+----------+-----+---------+------+

《Spark原理深入与编程实战》