从命令行使用kafka
这一节我们将通过一个简单的示例,演示如何使用Kafka命令行中的生产者(Producer)和消费者(Consumer)。
我们将运行ZooKeeper,然后运行Kafka服务器/Broker。我们将使用一些Kafka命令行实用程序,来创建Kafka主题,通过生产者发送消息,并从命令行使用/消费消息。
运行Kafka的ZooKeeper
Kafka依赖于ZooKeeper。为了简单起见,我们将使用单个ZooKeeper节点。
Kafka提供了一个ZooKeeper的启动脚本,叫做zooKeeper-server-start.sh,位于PBLP平台的 ~/bigdata/kafka_2.12-2.4.1/bin/zooKeeper-server-start.sh。
Kafka发行版还提供了一个ZooKeeper配置文件,它被设置为单节点运行。
为了运行ZooKeeper,我们在Kafka安装目录中创建下面这个run-zookeeper.sh脚本并运行它。
#!/usr/bin/env bash cd ~/bigdata/kafka_2.12-2.4.1 ./bin/zookeeper-server-start.sh config/zookeeper.properties
然后运行run-zookeeper.sh脚本:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./run-zookeeper.sh
等待约30秒左右,等待ZooKeeper启动。
运行Kafka服务器
Kafka还提供了一个名为kafka-server-start.sh的启动脚本,它位于PBLP平台的 ~/bigdata/kafka_2.12-2.4.1/bin/kafka-server-start.sh。
Kafka发行版也提供了一个Kafka配置文件,它是用来运行Kafka单节点的,并且指向运行在localhost:2181上的ZooKeeper。
为了运行Kafka,我们在Kafka安装目录中创建下面这个run-kafka.sh脚本,并在另一个终端窗口中运行它。
#!/usr/bin/env bash cd ~/bigdata/kafka_2.12-2.4.1 ./bin/kafka-server-start.sh config/server.properties
然后运行run-kafka.sh脚本:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./run-kafka.sh
等待大约30秒左右,Kafka启动。
现在让我们创建将要发送记录的主题(topic)。
创建Kafka主题
Kafka还提供了一个名为kafka-topics.sh的实用程序,它位于PBLP平台的bigdata/kafka_2.12-2.4.1/bin/kafka-topics.sh。
我们将使用这个工具创建一个名为my-topic的主题,复制因子为1,因为我们只有一台服务器。为my-topic使用13个分区,这意味着我们最多可以有13个Kafka消费者。
要运行Kafka,创建下面这个create-topic.sh脚本,并在另一个终端窗口中运行它。
#!/usr/bin/env bash cd ~/bigdata/kafka_2.12-2.4.1 # Create a topic ./bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 13 \ --topic my-topic
运行create-topic.sh脚本:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./create-topic.sh
注意,我们创建了一个名为my-topic的主题。
主题列表显示
我们可以像下面这样使用kafka-topics.sh来查看Kafka正在管理哪些topic主题。
创建下面这个list-topics.sh脚本,并在另一个终端窗口运行它。
#!/usr/bin/env bash cd ~/bigdata/kafka_2.12-2.4.1 # List existing topics ./bin/kafka-topics.sh --list --zookeeper localhost:2181
注意,我们必须指定运行在localhost端口2181上的ZooKeeper集群节点的位置。
运行list-topics.sh脚本:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./list-topics.sh
可以在主题列表中看到主题my-topic。
运行Kafka Producer控制台
Kafka发行版提供了一个命令实用程序来从命令行发送消息。它会启动一个终端窗口,我们输入的所有内容都会被发送到Kafka主题。
Kafka提供的这个实用程序是kafka-console-producer.sh,它位于PBLP平台的~/bigdata/kafka_2.12-2.4.1/bin/ kafka-console-producer.sh,用来在命令行上发送消息到一个主题。
创建下面这个start-producer-console.sh脚本,并在另一个终端窗口运行它。
#!/usr/bin/env bash cd ~/bigdata/kafka_2.12-2.4.1 ./bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic my-topic
注意,我们指定了运行在localhost:9092上的Kafka节点。
接下来运行这个start-producer-console.sh脚本,并发送至少4条消息。
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./start-producer-console.sh This is message 1 This is message 2 This is message 3 Message 4 Message 5
为了查看这些消息,我们需要运行消费者控制台。
运行Kafka Consumer控制台
Kafka发行版提供了一个命令实用程序,可以从命令行查看消息。它以各种模式显示消息。
Kafka提供的这个实用程序kafka-console-consumer.sh,位于~/bigdata/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh,用于在命令行上接收来自主题的消息。
创建下面这个start-consumer-console.sh脚本,并在另一个终端窗口运行它。
#!/usr/bin/env bash cd ~/bigdata/kafka_2.12-2.4.1 ./bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic my-topic \ --from-beginning
注意,我们像之前一样指定了运行在localhost:9092的Kafka节点,但是我们也指定了从头开始(--from-beginning)读取my-topic的所有消息。
在另一个终端上运行start-consumer-console.sh:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./start-consumer-console.sh Message 4 This is message 2 This is message 1 This is message 3 Message 5 Message 6 Message 7
注意到,这些消息不是按顺序来的。这是因为我们只有一个消费者,所以它正在从所有13个分区读取消息。顺序只在分区内得到保证。
创建Kafka描述主题脚本
可以使用Kafka-topics.sh脚本查看Kafka主题在Kafka Brokers中的布局。---describe将显示分区、ISR和Broker分区领导。
创建并编辑describe-topics.sh:
#!/usr/bin/env bash cd ~/bigdata/kafka_2.12-2.4.1 # 现有主题列表 ./bin/kafka-topics.sh --describe \ --topic my-failsafe-topic \ --zookeeper localhost:2181
让我们运行kafka-topics.sh --describe,并查看my-failsafe-topic的拓扑结构。
我们将列出哪个Broker拥有哪个分区(哪个分区的leader),以及每个分区的副本和ISR。ISR是最新的副本。记住一共有13个分区。
Kafka主题分区所有权的拓扑如下:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./describe-topics.sh Topic: my-failsafe-topic PartitionCount: 13 ReplicationFactor: 3 Configs: Topic: my-failsafe-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: my-failsafe-topic Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: my-failsafe-topic Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: my-failsafe-topic Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: my-failsafe-topic Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: my-failsafe-topic Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: my-failsafe-topic Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: my-failsafe-topic Partition: 7 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: my-failsafe-topic Partition: 8 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: my-failsafe-topic Partition: 9 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: my-failsafe-topic Partition: 10 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: my-failsafe-topic Partition: 11 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: my-failsafe-topic Partition: 12 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
请注意,每个Broker如何作为leader和follower共享分区。此外,看看Kafka如何在每个代理上复制分区。
另外,如果想描述所有的主题,则只指定--describe选项即可,不要再指定--topic选项。
从命令行使用Kafka的常见问题
问:首先运行哪个服务器?
答:需要先运行ZooKeeper,再运行Kafka。
问:用什么工具来创建一个主题?
答:kafka-topics.sh
问:用什么工具来查看主题?
答:kafka-topics.sh
问:使用什么工具在命令行上发送消息?
答:kafka-console-producer.sh
问:使用什么工具来查看一个主题中的消息?
答:kafka-console-consumer.sh
问:为什么消息者收到的消息会出现混乱?
答:这些消息被分成了13个分区。
问:我们如何才能使来自消费者的消息按顺序发送?
答:我们可以只使用一个分区或启动13个消费者。