创建Kafka Producer_使用Java API
Apache Kafka提供有Java API用于编写Producer和Consumer应用程序。
Apache Kafka包含四种核心的API:
- 1) Producer API,支持应用将数据流发送到Kafka集群的主题;
- 2) Consumer API,支持应用从Kafka集群的主题中读取数据流;
- 3) Streams API,支持数据流从输入主题转化到输出主题;
- 4) Connect API,支持实现持续地从一些源系统或应用摄入Kafka或者从Kafka推入一些源系统或应用的接口。
本节课我们主要讨论Producer API的使用。请确保本机上JDK版本要在8以上。
接下来,我们将使用Java API创建一个Kafka Producer程序。首先我们创建一个名为my-example-topic的Kafka复制主题,然后创建一个Kafka Producer向这个主题发送记录。这里将演示同步发送记录和异步发送记录两种方式。
1. 创建Maven项目,添加依赖
首先创建一个Maven项目,并修改pom.xml文件,添加对kafka-clients的依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>
2. 构造一个Kafka Producer
要创建一个Kafka生产者,需要给它传递一个引导服务器列表(一个Kafka broker列表)。还将指定一个client.id用来唯一标识这个生产者客户端。
在本例中,我们将发送带有id的消息。消息体是一个字符串,所以我们需要一个记录值序列化器(record value serializer),因为将在Kafka的记录value字段中发送消息体。消息id (long)将作为Kafka的记录键(key)发送。我们将需要指定一个Key序列化器和一个value序列化器,Kafka将使用它们将消息id编码为Kafka记录键(record key),并将消息体编码为Kafka记录值(record value)。
2.1 接下来,我们将导入Kafka包,并为主题定义一个常量,为生产者将连接的引导服务器列表定义一个常量。
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerExample { private final static String TOPIC = "my-example-topic"; // private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094"; private final static String BOOTSTRAP_SERVERS = "localhost:9092";
请注意,KafkaProducerExample导入了LongSerializer,它被配置为Kafka记录键(key)序列化器,并导入了StringSerializer,它被配置为记录值(value)序列化器。常量BOOTSTRAP_SERVERS被设置为localhost:9092(单节点)或localhost:9092,localhost:9093,localhost:9094(多节点)。常量TOPIC被设置为Kafka主题。
2.2 接下来,定义了一些常量,创建一个Kafka生产者。这些操作封装在静态方法createProducer中:
public class KafkaProducerExample { ... private static Producer<Long, String> createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new KafkaProducer<>(props); }
要创建一个Kafka生产者,需要使用java.util.Properties并定义一些我们传递给KafkaProducer构造函数的属性。
在上面的代码中,KafkaProducerExample.createProducer将BOOTSTRAP_SERVERS_CONFIG(即"bootstrap.servers")属性设置为我们之前定义的Kafka broker地址列表。BOOTSTRAP_SERVERS_CONFIG值是一个用逗号分隔的主机/端口对列表,生产者使用它建立到Kafka集群的初始连接。生产者使用集群中的所有服务器,不管我们在这里列出的是哪一个。此列表仅指定用于发现Kafka集群的全套服务器的初始Kafka broker。如果这个列表中的一个服务器宕机了,生产者就会去列表中的下一个broker来发现Kafka集群的完整拓扑结构。
CLIENT_ID_CONFIG(即"client.id")是一个id,在发出请求时传递给服务器,这样服务器就可以通过传递服务器端请求日志之类的生产者名称来跟踪请求的源,而不仅仅是IP/端口。
KEY_SERIALIZER_CLASS_CONFIG (即"key.serializer")是一个Kafka Serializer类,用于实现Kafka Serializer接口的Kafka记录键(key)。注意,我们将其设置为LongSerializer,因为示例中的消息id是长整数。
VALUE_SERIALIZER_CLASS_CONFIG (即"value.serializer")是一个Kafka Serializer类,用于实现Kafka Serializer接口的Kafka记录值(value)。注意,我们将其设置为StringSerializer,因为在我们的示例中,消息体是字符串。
2.3 使得Kafka Producer同步发送记录
Kafka提供了一个同步send方法来发送一个记录到一个主题。让我们使用这个方法发送一些消息id和消息到我们之前创建的Kafka主题。
public class KafkaProducerExample { ... static void runProducer(final int sendMessageCount) throws Exception { final Producer<Long, String> producer = createProducer(); long time = System.currentTimeMillis(); try { for (long index = time; index < time + sendMessageCount; index++) { final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "Hello xueai8.com " + index); RecordMetadata metadata = producer.send(record).get(); long elapsedTime = System.currentTimeMillis() - time; System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n", record.key(), record.value(), metadata.partition(), metadata.offset(), elapsedTime); } } finally { producer.flush(); producer.close(); } } ...
上面的语句中,遍历一个for循环,创建一个ProducerRecord,发送一个示例消息(“Hello xueai8.com” + 索引)作为记录值,并将for循环索引作为记录键。对于每次迭代,runProducer调用生产者的send方法(RecordMetadata metadata = producer.send(record).get())。该send方法返回一个Java Future。
响应的RecordMetadata具有写入记录的“partition”(分区)和该分区中记录的“offset”(偏移量)。
请注意对flush和close的调用。Kafka将自动刷新自己,但也可以显式调用flush发送累积的记录。在完成后关闭连接。
2.4 定义main方法
public static void main(String... args) throws Exception { if (args.length == 0) { runProducer(5); } else { runProducer(Integer.parseInt(args[0])); } }
main方法只是调用了runProducer方法。
3. 执行测试
请按以下步骤执行。
3.1)启动 Zookeeper
在第1个终端窗口,执行以下命令,启动Zookeeper:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
3.2)启动 Zookeeper
在第2个终端窗口,执行以下命令,启动Kafka:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-server-start.sh config/server.properties
3.3)创建topic主题 my-example-topic
在第3个终端窗口,执行以下命令,创建名为my-example-topic的topic主题:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-topics.sh --create \ --replication-factor 1 \ --partitions 1 \ --topic my-example-topic \ --zookeeper localhost:2181 # 查看 $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3.4)启动消费者脚本
在第4个终端窗口,执行以下命令,启动消费者脚本,读取my-example-topic主题:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic my-example-topic \ --from-beginning
3.5)运行生产者程序
运行我们前面定义的KafkaProducerExample.java生产者程序。
<3.6)回到第4个终端窗口(消费者控制台),查看读取到的内容,应该可以看到类似下面这样的消息:/p>
Hello xueai8.com 1638859945250 Hello xueai8.com 1638859945251 Hello xueai8.com 1638859945252 Hello xueai8.com 1638859945253 Hello xueai8.com 1638859945254
使用Kafka Producer异步发送记录
Kafka提供了一个异步send方法来发送一个记录到一个主题。下面我们重新定义一个方法,使用这个异步send方法发送一些消息id和消息到之前创建的Kafka主题。这里的最大区别是,使用lambda表达式来定义回调。
static void runProducer(final int sendMessageCount) throws InterruptedException { final Producer<Long, String> producer = createProducer(); long time = System.currentTimeMillis(); final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount); try { for (long index = time; index < time + sendMessageCount; index++) { final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "Hello xueai8.com " + index); producer.send(record, (metadata, exception) -> { long elapsedTime = System.currentTimeMillis() - time; if (metadata != null) { System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n", record.key(), record.value(), metadata.partition(), metadata.offset(), elapsedTime); } else { exception.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(25, TimeUnit.SECONDS); }finally { producer.flush(); producer.close(); } }
请注意使用了CountDownLatch,这样就可以发送所有N条消息,然后等待它们全部发送。
异步接口回调和异步发送方法
Kafka定义了一个Callback接口,用于异步操作。回调接口允许在请求完成时执行代码。回调在一个后台I/O线程中执行,所以它应该是快速的(不要阻塞它)。异步操作完成时调用onCompletion(RecordMetadata metadata, Exception exception) 。如果操作成功,则设置metadata(非空);如果操作出错,则设置exception(非空)。
异步send方法用于向主题发送一条记录,并且在send被确认时调用所提供的回调。send方法是异步的,一旦记录存储在缓冲区中,等待发送到Kafka broker,调用时立即返回。send方法允许并行地发送许多记录,而无需阻塞以等待每条记录之后的响应。
由于send调用是异步的,它为将分配给该记录的RecordMetadata返回一个Future。在这个future上调用get()将会阻塞,直到相关的请求完成,然后返回记录的元数据,或者抛出在发送记录时发生的任何异常。
此案例常见问题
问:Callback lambda做什么?
答:当请求完成时,回调将得到通知。
问:如果引导列表中的第一个服务器宕机了会发生什么?生产者仍然可以连接到集群中的其他Kafka broker吗?
答:生产者将尝试联系列表中的下一个broker。一旦联系上任何一个broker,就会让生产者知道整个Kafka集群。只要列表中至少有一个broker在运行,生产者就会进行连接。如果有100个broker,并且引导列表中3个服务器中的两个broker关闭了,生产者仍然可以使用剩下的98个broker。
问:什么时候使用Kafka异步发送?
答:如果已经在使用基于异步的代码(Akka, QBit, Reakt, Vert.x),并且想快速发送记录。
问:为什么一个Kafka记录需要两个序列化器?
答:其中一个序列化器用于Kafka记录键(record key),另一个用于Kafka记录值(record value)。