创建Kafka Consumer_使用Java API

Apache Kafka提供有Java API用于编写Producer和Consumer应用程序。

Apache Kafka包含四种核心的API:

  • 1) Producer API,支持应用将数据流发送到Kafka集群的主题;
  • 2) Consumer API,支持应用从Kafka集群的主题中读取数据流;
  • 3) Streams API,支持数据流从输入主题转化到输出主题;
  • 4) Connect API,支持实现持续地从一些源系统或应用摄入Kafka或者从Kafka推入一些源系统或应用的接口。

本节课我们主要讨论Consumer API的使用,来自Kafka的消费者API帮助连接到Kafka集群并消费数据流。请确保本机上JDK版本要在8以上。

Consumer是一个从Kafka topic中读取数据的应用程序。它订阅Kafka集群中的一个或多个主题,并从Kafka主题中获取令牌或消息。消费者到Kafka集群的连接性是通过心跳得知的。心跳设置在消费者端,让Zookeeper或Broker协调器知道消费者端是否仍然连接到集群。心跳的缺失意味着消费者不再连接到集群,在这种情况下,Broker协调器必须重新平衡负载。Heartbeat是集群的开销。通过考虑数据吞吐量和开销,Consumer上的心跳发生的间隔是可以配置的。此外,消费者可以分组,消费者组中的消费者可以共享他们订阅的topic的分区。如果一个Topic中有N个分区,消费者组中有N个消费者,并且该组订阅了一个Topic,那么每个消费者将从该主题的一个分区中读取数据。

接下来,我们将使用Java API创建一个Kafka Consumer程序。这个消费者从我们上节课中编写的Kafka Producer中获取消息。我们将演示如何使用Kafka消费者处理来自Kafka主题的记录。

在上一节课中,我们创建了一个简单的Java示例来创建一个Kafka生成器。我们还创建了复制的Kafka主题,称为my-example-topic,然后使用Kafka生产者发送记录(同步和异步)。这节课,我们创建的消费者程序将使用这些消息。

1. 创建Maven项目,添加依赖

首先创建一个Maven项目,并修改pom.xml文件,添加对kafka-clients的依赖:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.4.1</version>
</dependency>

2. 构造一个Kafka Consumer

要创建一个Kafka消费者,需要指定引导服务器(一个Kafka broker列表)。还需要定义一个group.id来标识该消费者属于哪个消费组。然后需要指定一个Kafka记录键反序列化器(record key deserializer )和一个记录值反序列化器(record value deserializer )。然后,需要将消费者订阅到主题my-example-topic。

2.1 接下来,导入Kafka包并为主题定义一个常量,并为消费者将连接的引导服务器列表定义一个常量。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;


import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {

    private final static String TOPIC = "my-example-topic";
    // private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    ...
}

请注意,KafkaConsumerExample导入了LongDeserializer,它被配置为Kafka记录键(key)反序列化器,并导入了StringDeserializer,它被配置为记录值(value)反序列化器。常量BOOTSTRAP_SERVERS被设置为localhost:9092(单节点)或localhost:9092,localhost:9093,localhost:9094(多节点)。常量TOPIC被设置为Kafka主题。

2.2 创建Kafka Consumer,使用topic接收记录

接下来,定义了一些常量,创建一个Kafka消费者。这些操作封装在静态方法createConsumer中:

public class KafkaConsumerExample {
  ...

  private static Consumer<Long, String> createConsumer() {
      final Properties props = new Properties();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

      // 使用props创建消费者
      final Consumer<Long, String> consumer = new KafkaConsumer<>(props);

      // 订阅该主题
      consumer.subscribe(Collections.singletonList(TOPIC));
      return consumer;
  }
  ...
}

要创建一个Kafka消费者,需要使用java.util.Properties并定义一些我们传递给KafkaConsumer构造函数的属性。

在上面的代码中,KafkaConsumerExample.createConsumer 将BOOTSTRAP_SERVERS_CONFIG(即"bootstrap.servers")属性设置为我们之前定义的Kafka broker地址列表。BOOTSTRAP_SERVERS_CONFIG值是一个用逗号分隔的主机/端口对列表,消费者使用它建立到Kafka集群的初始连接。消费者使用集群中的所有服务器,不管我们在这里列出的是哪一个。此列表仅指定用于发现Kafka集群的全套服务器的初始Kafka broker。如果这个列表中的一个服务器宕机了,消费者就会去列表中的下一个broker来发现Kafka集群的完整拓扑结构。

GROUP_ID_CONFIG标识该消费者的消费者组。

KEY_DESERIALIZER_CLASS_CONFIG (即"key.deserializer")是一个Kafka Deserializer类,用于实现Kafka Deserializer接口的Kafka记录键(key)。注意,我们将其设置为LongDeserializer,因为示例中的消息id是长整数。

VALUE_DESERIALIZER_CLASS_CONFIG (即"value.deserializer")是一个Kafka Deserializer类,用于实现Kafka Deserializer接口的Kafka记录值(value)。注意,我们将其设置为StringDeserializer,因为在我们的示例中,消息体是字符串。

需要将消费者订阅到主题consumer.subscribe(Collections.singletonList(TOPIC));。这个订阅方法获取要订阅的主题列表,如果当前有订阅,该列表将替换当前订阅。

2.3 使用Consumer处理来自Kafka的消息

现在,让我们用Kafka Consumer处理一些记录。

public class KafkaConsumerExample {
  ...

    static void runConsumer() throws InterruptedException {
        final Consumer<Long, String> consumer = createConsumer();

        final int giveUp = 100;   
        int noRecordsCount = 0;

        while (true) {
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);

            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }

            consumerRecords.forEach(record -> {
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });

            consumer.commitAsync();
        }
        consumer.close();
        System.out.println("DONE");
    }
}

在上面的语句中,使用了ConsumerRecords,它是来自Kafka主题分区的一组记录。ConsumerRecords类是一个容器,它保存一个特定主题的每个分区的ConsumerRecord列表。对于consumer.poll()返回的每个主题分区,都有一个ConsumerRecord列表。

请注意,如果接收到记录(consumerRecords.count()!=0),那么runConsumer方法将调用consumer.commitAsync(),它将提交在最后一次调用consumer.poll(…)时为所有订阅的主题分区列表返回的偏移量。

2.4 Kafka Consumer Poll方法

轮询方法poll根据当前分区偏移量返回获取的记录。轮询方法poll是等待指定时间(以秒为单位)的阻塞方法。如果在指定的时间段之后没有可用的记录,则轮询方法返回一个空的ConsumerRecords。

当新的记录可用时,轮询方法直接返回。

可以用props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);控制poll()返回的最大记录数。轮询方法poll不是线程安全的,不能从多个线程调用。

2.5 定义main方法

接下来,定义用来测试的main方法。

public class KafkaConsumerExample {

  public static void main(String... args) throws Exception {
      runConsumer();
  }
}

这个main方法只是调用了runConsumer。

3. 执行测试

请按以下步骤执行。

3.1)启动 Zookeeper

在第1个终端窗口,执行以下命令,启动Zookeeper:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

3.2)启动 Zookeeper

在第2个终端窗口,执行以下命令,启动Kafka:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-server-start.sh config/server.properties

3.3)创建topic主题 my-example-topic

在第3个终端窗口,执行以下命令,创建名为my-example-topic的topic主题:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./bin/kafka-topics.sh --create \
      --replication-factor 1 \
      --partitions 1 \
      --topic my-example-topic \
      --zookeeper localhost:2181
# 查看
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.4)运行前面我们定义的KafkaConsumerExample.java消费者程序,轮询/读取my-example-topic主题。

3.5)运行我们上节课定义的KafkaProducerExample.java生产者程序,向my-example-topic主题发送消息。

3.6)回到消费者程序运行控制台,查看读取到的内容,应该可以看到类似下面这样的消息:

Consumer Record:(1638867557865, Hello xueai8.com 1638867557865, 0, 10)
Consumer Record:(1638867557866, Hello xueai8.com 1638867557866, 0, 11)
Consumer Record:(1638867557867, Hello xueai8.com 1638867557867, 0, 12)
Consumer Record:(1638867557868, Hello xueai8.com 1638867557868, 0, 13)
Consumer Record:(1638867557869, Hello xueai8.com 1638867557869, 0, 14)

4. 尝试:三个消费者在同一组,一个生产者发送25条消息

在IDE中运行KafkaConsumerExample.java消费者程序示例三次。然后将生产者程序修改为发送25条记录,而不是5条。然后从IDE运行一次KafkaProducerExample.java生产者程序。会发生什么呢?消费者应该分享这些信息。

KafkaProducerExample.java生产者程序输出:

sent record(key=1495048417121 value=..) meta(partition=6, offset=16) time=118
sent record(key=1495048417131 value=..) meta(partition=6, offset=17) time=120
sent record(key=1495048417133 value=..) meta(partition=12, offset=17) time=120
sent record(key=1495048417140 value=..) meta(partition=12, offset=18) time=121
sent record(key=1495048417143 value=..) meta(partition=12, offset=19) time=121
sent record(key=1495048417123 value=..) meta(partition=0, offset=19) time=121
sent record(key=1495048417126 value=..) meta(partition=0, offset=20) time=121
sent record(key=1495048417134 value=..) meta(partition=0, offset=21) time=122
sent record(key=1495048417122 value=..) meta(partition=3, offset=19) time=122
sent record(key=1495048417127 value=..) meta(partition=3, offset=20) time=122
sent record(key=1495048417139 value=..) meta(partition=3, offset=21) time=123
sent record(key=1495048417142 value=..) meta(partition=3, offset=22) time=123
sent record(key=1495048417136 value=..) meta(partition=10, offset=19) time=127
sent record(key=1495048417144 value=..) meta(partition=1, offset=26) time=128
sent record(key=1495048417125 value=..) meta(partition=5, offset=22) time=128
sent record(key=1495048417138 value=..) meta(partition=5, offset=23) time=128
sent record(key=1495048417128 value=..) meta(partition=8, offset=21) time=129
sent record(key=1495048417124 value=..) meta(partition=11, offset=18) time=129
sent record(key=1495048417130 value=..) meta(partition=11, offset=19) time=129
sent record(key=1495048417132 value=..) meta(partition=11, offset=20) time=130
sent record(key=1495048417141 value=..) meta(partition=11, offset=21) time=130
sent record(key=1495048417145 value=..) meta(partition=11, offset=22) time=131
sent record(key=1495048417129 value=..) meta(partition=2, offset=24) time=132
sent record(key=1495048417135 value=..) meta(partition=2, offset=25) time=132
sent record(key=1495048417137 value=..) meta(partition=2, offset=26) time=132

注意生产者发送25条消息。

同一组中Consumer 0输出:

Consumer Record:(1495048417121, Hello xueai8.com 1495048417121, 6, 16)
Consumer Record:(1495048417131, Hello xueai8.com 1495048417131, 6, 17)
Consumer Record:(1495048417125, Hello xueai8.com 1495048417125, 5, 22)
Consumer Record:(1495048417138, Hello xueai8.com 1495048417138, 5, 23)
Consumer Record:(1495048417128, Hello xueai8.com 1495048417128, 8, 21)

同一组中Consumer 1输出:

Consumer Record:(1495048417123, Hello xueai8.com 1495048417123, 0, 19)
Consumer Record:(1495048417126, Hello xueai8.com 1495048417126, 0, 20)
Consumer Record:(1495048417134, Hello xueai8.com 1495048417134, 0, 21)
Consumer Record:(1495048417144, Hello xueai8.com 1495048417144, 1, 26)
Consumer Record:(1495048417122, Hello xueai8.com 1495048417122, 3, 19)
Consumer Record:(1495048417127, Hello xueai8.com 1495048417127, 3, 20)
Consumer Record:(1495048417139, Hello xueai8.com 1495048417139, 3, 21)
Consumer Record:(1495048417142, Hello xueai8.com 1495048417142, 3, 22)
Consumer Record:(1495048417129, Hello xueai8.com 1495048417129, 2, 24)
Consumer Record:(1495048417135, Hello xueai8.com 1495048417135, 2, 25)
Consumer Record:(1495048417137, Hello xueai8.com 1495048417137, 2, 26)

同一组中Consumer 2输出:

Consumer Record:(1495048417136, Hello xueai8.com 1495048417136, 10, 19)
Consumer Record:(1495048417133, Hello xueai8.com 1495048417133, 12, 17)
Consumer Record:(1495048417140, Hello xueai8.com 1495048417140, 12, 18)
Consumer Record:(1495048417143, Hello xueai8.com 1495048417143, 12, 19)
Consumer Record:(1495048417124, Hello xueai8.com 1495048417124, 11, 18)
Consumer Record:(1495048417130, Hello xueai8.com 1495048417130, 11, 19)
Consumer Record:(1495048417132, Hello xueai8.com 1495048417132, 11, 20)
Consumer Record:(1495048417141, Hello xueai8.com 1495048417141, 11, 21)
Consumer Record:(1495048417145, Hello xueai8.com 1495048417145, 11, 22)

尝试:三个消费者在不同的消费者组,一个生产者发送5条消息

修改消费者程序,使每个消费者进程都有一个唯一的组id。

从上次运行中停止所有的消费者和生产者进程。

然后在IDE中执行消费者程序示例三次。修改生产者发送5条记录而不是25条,然后从IDE运行一次生产者。思考一下,会发生什么呢?每个使用者都应该得到所有消息的副本。

首先,让我们修改KafkaConsumerExample.java,使其消费者组id是唯一的。如下所示:

public class KafkaConsumerExample {

    private final static String TOPIC = "my-example-topic";
    // private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    private static Consumer<Long, String> createConsumer() {
        final Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer" + System.currentTimeMillis());

        ...
  }
...
}

注意,为了使组id惟一,向其添加了System.currentTimeMillis()。

生产者输出:

sent record(key=1495049585396 value=..) meta(partition=7, offset=30) time=134
sent record(key=1495049585392 value=..) meta(partition=4, offset=24) time=138
sent record(key=1495049585393 value=..) meta(partition=4, offset=25) time=139
sent record(key=1495049585395 value=..) meta(partition=4, offset=26) time=139
sent record(key=1495049585394 value=..) meta(partition=11, offset=25) time=140

注意生产者发送了25条消息。

Consumer 0在自己的组中:

Consumer Record:(1495049585396, Hello xueai8.com 1495049585396, 7, 30)
Consumer Record:(1495049585394, Hello xueai8.com 1495049585394, 11, 25)
Consumer Record:(1495049585392, Hello xueai8.com 1495049585392, 4, 24)
Consumer Record:(1495049585393, Hello xueai8.com 1495049585393, 4, 25)
Consumer Record:(1495049585395, Hello xueai8.com 1495049585395, 4, 26)

Consumer 1在自己的组中:

Consumer Record:(1495049585396, Hello xueai8.com 1495049585396, 7, 30)
Consumer Record:(1495049585394, Hello xueai8.com 1495049585394, 11, 25)
Consumer Record:(1495049585392, Hello xueai8.com 1495049585392, 4, 24)
Consumer Record:(1495049585393, Hello xueai8.com 1495049585393, 4, 25)
Consumer Record:(1495049585395, Hello xueai8.com 1495049585395, 4, 26)

Consumer 2在自己的组中:

Consumer Record:(1495049585396, Hello xueai8.com 1495049585396, 7, 30)
Consumer Record:(1495049585394, Hello xueai8.com 1495049585394, 11, 25)
Consumer Record:(1495049585392, Hello xueai8.com 1495049585392, 4, 24)
Consumer Record:(1495049585393, Hello xueai8.com 1495049585393, 4, 25)
Consumer Record:(1495049585395, Hello xueai8.com 1495049585395, 4, 26)

Kafka消费者常见问题

问:如何在消费者组中演示消费者如何划分主题分区并共享它们?

答:在同一个消费者组中运行三个消费者,然后从生产者那里发送25条消息。我们看到每个消费者都拥有一组分区。

问:如何向不同消费者组的消费者展示他们各自的偏移量?

答:我们运行3个消费者,每个消费者都有自己唯一的消费者组,然后从生产者那里发送5条消息。我们看到每个消费者拥有每个分区。/p>

问:轮询获得有多少记录?

答:在传递给KafkaConsumer的属性中通过put(ConsumerConfig。MAX_POLL_RECORDS_CONFIG, 100);设置的数量。

问:一次对轮询的调用是否从两个不同的分区获得记录?

答:No。


《PySpark原理深入与编程实战》