从命令行使用kafka

这一节我们将通过一个简单的示例,演示如何使用Kafka命令行中的生产者(Producer)和消费者(Consumer)。

我们将运行ZooKeeper,然后运行Kafka服务器/Broker。我们将使用一些Kafka命令行实用程序,来创建Kafka主题,通过生产者发送消息,并从命令行使用/消费消息。

运行Kafka的ZooKeeper

Kafka依赖于ZooKeeper。为了简单起见,我们将使用单个ZooKeeper节点。

Kafka提供了一个ZooKeeper的启动脚本,叫做zooKeeper-server-start.sh,位于PBLP平台的 ~/bigdata/kafka_2.12-2.4.1/bin/zooKeeper-server-start.sh。

Kafka发行版还提供了一个ZooKeeper配置文件,它被设置为单节点运行。

为了运行ZooKeeper,我们在Kafka安装目录中创建下面这个run-zookeeper.sh脚本并运行它。

#!/usr/bin/env bash
cd ~/bigdata/kafka_2.12-2.4.1

./bin/zookeeper-server-start.sh config/zookeeper.properties

然后运行run-zookeeper.sh脚本:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./run-zookeeper.sh

等待约30秒左右,等待ZooKeeper启动。

运行Kafka服务器

Kafka还提供了一个名为kafka-server-start.sh的启动脚本,它位于PBLP平台的 ~/bigdata/kafka_2.12-2.4.1/bin/kafka-server-start.sh。

Kafka发行版也提供了一个Kafka配置文件,它是用来运行Kafka单节点的,并且指向运行在localhost:2181上的ZooKeeper。

为了运行Kafka,我们在Kafka安装目录中创建下面这个run-kafka.sh脚本,并在另一个终端窗口中运行它。

#!/usr/bin/env bash
cd ~/bigdata/kafka_2.12-2.4.1

./bin/kafka-server-start.sh config/server.properties

然后运行run-kafka.sh脚本:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./run-kafka.sh

等待大约30秒左右,Kafka启动。

现在让我们创建将要发送记录的主题(topic)。

创建Kafka主题

Kafka还提供了一个名为kafka-topics.sh的实用程序,它位于PBLP平台的bigdata/kafka_2.12-2.4.1/bin/kafka-topics.sh。

我们将使用这个工具创建一个名为my-topic的主题,复制因子为1,因为我们只有一台服务器。为my-topic使用13个分区,这意味着我们最多可以有13个Kafka消费者。

要运行Kafka,创建下面这个create-topic.sh脚本,并在另一个终端窗口中运行它。

#!/usr/bin/env bash

cd ~/bigdata/kafka_2.12-2.4.1

# Create a topic
./bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 13 \
  --topic my-topic

运行create-topic.sh脚本:

$ cd ~/bigdata/kafka_2.12-2.4.1

$ ./create-topic.sh

注意,我们创建了一个名为my-topic的主题。

主题列表显示

我们可以像下面这样使用kafka-topics.sh来查看Kafka正在管理哪些topic主题。

创建下面这个list-topics.sh脚本,并在另一个终端窗口运行它。

#!/usr/bin/env bash

cd ~/bigdata/kafka_2.12-2.4.1

# List existing topics
./bin/kafka-topics.sh --list --zookeeper localhost:2181

注意,我们必须指定运行在localhost端口2181上的ZooKeeper集群节点的位置。

运行list-topics.sh脚本:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./list-topics.sh

可以在主题列表中看到主题my-topic。

运行Kafka Producer控制台

Kafka发行版提供了一个命令实用程序来从命令行发送消息。它会启动一个终端窗口,我们输入的所有内容都会被发送到Kafka主题。

Kafka提供的这个实用程序是kafka-console-producer.sh,它位于PBLP平台的~/bigdata/kafka_2.12-2.4.1/bin/ kafka-console-producer.sh,用来在命令行上发送消息到一个主题。

创建下面这个start-producer-console.sh脚本,并在另一个终端窗口运行它。

#!/usr/bin/env bash
cd ~/bigdata/kafka_2.12-2.4.1

./bin/kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic my-topic

注意,我们指定了运行在localhost:9092上的Kafka节点。

接下来运行这个start-producer-console.sh脚本,并发送至少4条消息。

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-producer-console.sh

This is message 1
This is message 2
This is message 3
Message 4
Message 5

为了查看这些消息,我们需要运行消费者控制台。

运行Kafka Consumer控制台

Kafka发行版提供了一个命令实用程序,可以从命令行查看消息。它以各种模式显示消息。

Kafka提供的这个实用程序kafka-console-consumer.sh,位于~/bigdata/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh,用于在命令行上接收来自主题的消息。

创建下面这个start-consumer-console.sh脚本,并在另一个终端窗口运行它。

#!/usr/bin/env bash
cd ~/bigdata/kafka_2.12-2.4.1

./bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic my-topic \
    --from-beginning

注意,我们像之前一样指定了运行在localhost:9092的Kafka节点,但是我们也指定了从头开始(--from-beginning)读取my-topic的所有消息。

在另一个终端上运行start-consumer-console.sh:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console.sh

Message 4
This is message 2
This is message 1
This is message 3
Message 5
Message 6
Message 7

注意到,这些消息不是按顺序来的。这是因为我们只有一个消费者,所以它正在从所有13个分区读取消息。顺序只在分区内得到保证。

创建Kafka描述主题脚本

可以使用Kafka-topics.sh脚本查看Kafka主题在Kafka Brokers中的布局。---describe将显示分区、ISR和Broker分区领导。

创建并编辑describe-topics.sh:

#!/usr/bin/env bash

cd ~/bigdata/kafka_2.12-2.4.1

# 现有主题列表
./bin/kafka-topics.sh --describe \
    --topic my-failsafe-topic \
    --zookeeper localhost:2181

让我们运行kafka-topics.sh --describe,并查看my-failsafe-topic的拓扑结构。

我们将列出哪个Broker拥有哪个分区(哪个分区的leader),以及每个分区的副本和ISR。ISR是最新的副本。记住一共有13个分区。

Kafka主题分区所有权的拓扑如下:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./describe-topics.sh

Topic: my-failsafe-topic    PartitionCount: 13    ReplicationFactor: 3    Configs:
    Topic: my-failsafe-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-failsafe-topic    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: my-failsafe-topic    Partition: 2    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: my-failsafe-topic    Partition: 3    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: my-failsafe-topic    Partition: 4    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: my-failsafe-topic    Partition: 5    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: my-failsafe-topic    Partition: 6    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-failsafe-topic    Partition: 7    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: my-failsafe-topic    Partition: 8    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: my-failsafe-topic    Partition: 9    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: my-failsafe-topic    Partition: 10    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: my-failsafe-topic    Partition: 11    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: my-failsafe-topic    Partition: 12    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1

请注意,每个Broker如何作为leader和follower共享分区。此外,看看Kafka如何在每个代理上复制分区。

另外,如果想描述所有的主题,则只指定--describe选项即可,不要再指定--topic选项。


从命令行使用Kafka的常见问题

问:首先运行哪个服务器?

答:需要先运行ZooKeeper,再运行Kafka。

问:用什么工具来创建一个主题?

答:kafka-topics.sh

问:用什么工具来查看主题?

答:kafka-topics.sh

问:使用什么工具在命令行上发送消息?

答:kafka-console-producer.sh

问:使用什么工具来查看一个主题中的消息?

答:kafka-console-consumer.sh

问:为什么消息者收到的消息会出现混乱?

答:这些消息被分成了13个分区。

问:我们如何才能使来自消费者的消息按顺序发送?

答:我们可以只使用一个分区或启动13个消费者。


《PySpark原理深入与编程实战》