使用数据源Kafka

Kafka通常用于构建实时流数据管道,以可靠地在系统之间移动数据,还用于转换和响应数据流。Kafka作为集群运行在一个或多个服务器上。

配套视频:

使用Kafka数据源


Kafka的一些关键概念描述如下:

  • Topic:主题。消息发布到的类别或流名称的高级抽象。主题可以有0、1或多个消费者,这些消费者订阅发布到该主题的消息。用户为每个新的消息类别定义一个新主题;
  • Producers:生产者。向主题发布消息的客户端;
  • Consumers:消费者。使用来自主题的消息的客户端;
  • Brokers:服务器。复制和持久化消息数据的一个或多个服务器。

此外,生产者和消费者可以同时对多个主题进行读写。每个Kafka主题都是分区的,写入每个分区的消息都是顺序的。分区中的消息具有一个偏移量,用来惟一标识分区内的每个消息。

主题的分区是分布式的,每个Broker处理对分区共享的请求。每个分区在Brokers(数量可配置的)之间复制。Kafka集群在一段时间内(可配置的)保留所有已发布的消息。Kafka使用ZooKeeper作为其分布式进程的协调服务。

说明:Kafka的数据源可能是在生产型流应用程序中最常用的数据源。为了有效地处理这个数据源,我们需要一定的Kafka使用基本知识。

在使用Kafka数据源时,我们的流程序实际上充当了Kafka的消费者。因此,流程序所需要的信息与Kafka的消费者所需要的信息相似。下表列出了配置Kafka数据源一些选项:

Option 描述
kafka.bootstrap.servers host1:port1, host2:port2 Kafka服务器列表,逗号分隔。
subscribe topic1, topic2 这个数据源要读取的主题名列表,以逗号分隔。
subscribePattern topic.* 使用正则模式表示要读取数据的主题,比subscribe要灵活。
assign {topic1:[1,2], topic2:[3,4]} 指定要读取数据的主题的分区。这个信息必须是json格式。

其中必需的信息是要连接的Kafka服务器的列表,以及一个或多个从其读取数据的主题。为了支持选择从哪个主题和主题分区来读取数据的各种方法,它支持三种不同的方式来指定这些信息。我们只需要选择最适合自身用例的那个即可。

默认情况下,Kafka的数据源并不是Spark的内置数据源,因此如果要开发读取Kafka数据的Spark结构化流处理程序,必须添加Kafka的依赖包到classpath中。

下面我们通过一个示例来演示如何编写Spark结构化流处理程序来读取Kafka中的数据。

【示例】编写Spark结构化流程序作为Kafka的消费者程序,Kafka作为流数据源。

在这个示例中,我们使用Kafka自带的生产者脚本向Kafka的test主题发送内容,而Spark结构化流程序会订阅该主题。一旦它收到了订阅的消息,马上输出到控制台中。程序处理流程如下图所示:

首先,我们编写Spark结构化流程序代码。实现如下:

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local")
      .appName("Kafka Source")
      .getOrCreate()

    // 创建一个流来监听test topic的消息
    val dataDF = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .option("startingOffsets", "earliest")
      .load()

    // 查看这个DataFrame的schema
    dataDF.printSchema()

    // 将该流转换为String数据类型(key和value都是字节数组形式)
    // kvstream = dataDF.selectExpr("CAST(key as string)", "CAST(value as string)")
    val kvstream = dataDF.selectExpr("CAST(value as string)","topic","partition","offset")

    // 将该流写出到控制台
    val query = kvstream.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // 保持运行直到终止
    query.awaitTermination()
  }

启动流程序,开始接收从Kafka“test”主题订阅的消息。

向Kafka“test”主题发送消息,随意键入一些消息。例如:

> good good study
> day day up

回到流程序执行窗口,如果一切正常,应该可以看到在控制台输出收到的订阅消息,如下:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+---------+------+
|value|topic|partition|offset|
+-----+-----+---------+------+
+-----+-----+---------+------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------------+-----+---------+------+
|          value|topic|partition|offset|
+---------------+-----+---------+------+
|good good study| test|        0|     0|
+---------------+-----+---------+------+

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+-----+---------+------+
|     value|topic|partition|offset|
+----------+-----+---------+------+
|day day up| test|        0|     1|
+----------+-----+---------+------+

《PySpark原理深入与编程实战》