使用数据源Kafka
Kafka通常用于构建实时流数据管道,以可靠地在系统之间移动数据,还用于转换和响应数据流。Kafka作为集群运行在一个或多个服务器上。
配套视频:
使用Kafka数据源
Kafka的一些关键概念描述如下:
- Topic:主题。消息发布到的类别或流名称的高级抽象。主题可以有0、1或多个消费者,这些消费者订阅发布到该主题的消息。用户为每个新的消息类别定义一个新主题;
- Producers:生产者。向主题发布消息的客户端;
- Consumers:消费者。使用来自主题的消息的客户端;
- Brokers:服务器。复制和持久化消息数据的一个或多个服务器。
此外,生产者和消费者可以同时对多个主题进行读写。每个Kafka主题都是分区的,写入每个分区的消息都是顺序的。分区中的消息具有一个偏移量,用来惟一标识分区内的每个消息。
主题的分区是分布式的,每个Broker处理对分区共享的请求。每个分区在Brokers(数量可配置的)之间复制。Kafka集群在一段时间内(可配置的)保留所有已发布的消息。Kafka使用ZooKeeper作为其分布式进程的协调服务。
说明:Kafka的数据源可能是在生产型流应用程序中最常用的数据源。为了有效地处理这个数据源,我们需要一定的Kafka使用基本知识。
在使用Kafka数据源时,我们的流程序实际上充当了Kafka的消费者。因此,流程序所需要的信息与Kafka的消费者所需要的信息相似。下表列出了配置Kafka数据源一些选项:
Option | 值 | 描述 |
---|---|---|
kafka.bootstrap.servers | host1:port1, host2:port2 | Kafka服务器列表,逗号分隔。 |
subscribe | topic1, topic2 | 这个数据源要读取的主题名列表,以逗号分隔。 |
subscribePattern | topic.* | 使用正则模式表示要读取数据的主题,比subscribe要灵活。 |
assign | {topic1:[1,2], topic2:[3,4]} | 指定要读取数据的主题的分区。这个信息必须是json格式。 |
其中必需的信息是要连接的Kafka服务器的列表,以及一个或多个从其读取数据的主题。为了支持选择从哪个主题和主题分区来读取数据的各种方法,它支持三种不同的方式来指定这些信息。我们只需要选择最适合自身用例的那个即可。
默认情况下,Kafka的数据源并不是Spark的内置数据源,因此如果要开发读取Kafka数据的Spark结构化流处理程序,必须添加Kafka的依赖包到classpath中。
下面我们通过一个示例来演示如何编写Spark结构化流处理程序来读取Kafka中的数据。
【示例】编写Spark结构化流程序作为Kafka的消费者程序,Kafka作为流数据源。
在这个示例中,我们使用Kafka自带的生产者脚本向Kafka的test主题发送内容,而Spark结构化流程序会订阅该主题。一旦它收到了订阅的消息,马上输出到控制台中。程序处理流程如下图所示:
首先,我们编写Spark结构化流程序代码。实现如下:
def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local") .appName("Kafka Source") .getOrCreate() // 创建一个流来监听test topic的消息 val dataDF = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("startingOffsets", "earliest") .load() // 查看这个DataFrame的schema dataDF.printSchema() // 将该流转换为String数据类型(key和value都是字节数组形式) // kvstream = dataDF.selectExpr("CAST(key as string)", "CAST(value as string)") val kvstream = dataDF.selectExpr("CAST(value as string)","topic","partition","offset") // 将该流写出到控制台 val query = kvstream.writeStream .outputMode("append") .format("console") .start() // 保持运行直到终止 query.awaitTermination() }
启动流程序,开始接收从Kafka“test”主题订阅的消息。
向Kafka“test”主题发送消息,随意键入一些消息。例如:
> good good study > day day up
回到流程序执行窗口,如果一切正常,应该可以看到在控制台输出收到的订阅消息,如下:
root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) ------------------------------------------- Batch: 0 ------------------------------------------- +-----+-----+---------+------+ |value|topic|partition|offset| +-----+-----+---------+------+ +-----+-----+---------+------+ ------------------------------------------- Batch: 1 ------------------------------------------- +---------------+-----+---------+------+ | value|topic|partition|offset| +---------------+-----+---------+------+ |good good study| test| 0| 0| +---------------+-----+---------+------+ ------------------------------------------- Batch: 2 ------------------------------------------- +----------+-----+---------+------+ | value|topic|partition|offset| +----------+-----+---------+------+ |day day up| test| 0| 1| +----------+-----+---------+------+