从命令行删除Kafka Topic

Apache Kafka接收到的每一条消息都存储在日志中,默认情况下,它保持消息168小时,即7天。删除主题或其所有消息有几种方法,下面我们将解释这些方法。

删除主题中的所有消息

Apache Kafka发行版附带了bin/kafka-config.sh脚本,它提供了许多有用的选项来修改Kafka配置。在不同的选项中,我们将使用“--alter”和“--add-config retention.ms”选项临时更改保留策略为1秒,这将删除一个主题中的所有消息。

首先,让我们运行kafka-configs.sh命令来获取保留值。

$ bin/kafka-configs.sh --zookeeper localhost:2181 \
                 --alter --entity-type topics \
                 --add-config retention.ms=1000 \ 
                 --entity-name text_topic 

通过运行下面的命令验证保留策略值是否更改。

$ bin/kafka-configs.sh --zookeeper localhost:2181 \
                 --entity-type topics \
                 --describe \
                 --entity-name text_topic

等待几秒钟,它应该已经从Kafka topic删除了所有的旧消息。现在使用“--delete-config”配置删除保留策略,该配置将把它设置回默认值。

注意:这是一个非常重要的步骤,确保运行它没有失败。否则,将丢失所有未来的消息。

$ bin/kafka-configs.sh --zookeeper localhost:2181 \
                --alter --entity-type topics \
                --delete-config retention.ms \
                --entity-name text_topic

删除Kafka主题并重新创建

在Apache Kafka的最新版本中,删除一个主题是很容易的。只需要将配置中的一个属性设置为'true',并发出删除主题的命令。

在删除现有主题之前,首先获取现有主题的分区和副本,因为需要用相同的配置重新创建它们。可以通过运行kafka-topics.sh脚本并指定选项--describe获得text_topic此信息。

$ bin/kafka-topics.sh --zookeeper localhost:2181 \
                --topic text_topic \
                --describe

主题text_topic有一个复制因子和一个分区。

a) 删除Kafka主题

首先,进入到Kafka的配置目录,它位于PBLP平台的~/bigdata/kafka_2.12-2.4.1/config/目录下。在这里,找到server.properties文件,用任意文本编辑器打开,添加一行“delete.topic.enable=true”,或者,如果属性已经存在,则将属性的值更改为true:

delete.topic.enable=true

现在,进入bin目录,运行kafka-topics.sh命令并指定--delete选项删除主题text_topic:

$ bin/kafka-topics.sh --zookeeper localhost:2181 \
                --topic text_topic \
                --delete

这将从所有Kafka broker分区中移除text_topic。

b) 重新创建Kafka主题

运行bin/kafka-topics.sh命令重新创建主题,带有之前命令获取的复制和分区详细信息。

$ bin/kafka-topics.sh --zookeeper localhost:2181 \
                --create \
                --topic text_topic \
                --replication-factor 1 --partitions 1

这将创建带有复制因子1和分区1的Kafka主题text_topic。

手动删除Kafka日志中的数据

正如本节课开始所说,Kafka将日志中的所有消息存储在log.dir中指定的各自节点中。我们应该从所有节点中删除主题的所有这些消息。下面是要删除的步骤。

  • 停止所有节点的zookeeper和Kafka broker。
  • 清除所有节点的日志。日志文件存储在/tmp/kafka-logs/MyTopic-0,其中/tmp/kafka-logs由log.dir属性指定。
  • 从所有节点重新启动zookeeper和Kafka broker。

《PySpark原理深入与编程实战》