发布日期:2025-06-16 VIP内容

Kafka 4.0.0集群安装与配置

Apache Kafka 4.0.0 于 2025 年 3 月 18 日 正式发布,这不仅仅是一次常规的版本更新,它更是一个重要的里程碑,标志着首个完全剔除 Apache ZooKeeper® 即可运行的重大版本发布。通过KRaft 模式运行,Kafka 简化部署和管理工作,消除维护独立 ZooKeeper 集群的复杂性,打破ZooKeeper自身缺陷造成Kafka集群规模的限制。

ZooKeeper-based架构和KRaft-based架构

ZooKeeper-based 和 KRaft-based 是 Kafka 集群的两种不同架构模式,主要区别在于元数据管理和协调机制。以下是详细对比:

特性 ZooKeeper-based 架构 KRaft-based 架构
协调服务 依赖 ZooKeeper(外部系统) 内置 Raft 协议(Kafka 自身实现)
元数据存储 主要存于ZooKeeper,Kafka保存部分副本 完全由Kafka节点(Controller)管理
架构复杂度 需维护 ZooKeeper 集群(额外资源) 单集群架构,简化运维
故障恢复 依赖 ZooKeeper 选举,恢复时间较长 内置 Controller 快速选举(毫秒级)
扩展性 元数据操作受限于 ZooKeeper 性能 水平扩展能力更强(尤其大规模集群)
典型版本 Kafka 2.8.x 及以前 Kafka 3.3.x 及以后(推荐 3.5+)

Kafka 4.0 默认启用KRaft 模式(Kafka Raft),完全移除ZooKeeper依赖。其核心原理如下:

  • 元数据自管理:基于 Raft 共识算法,将元数据存储于内置的__cluster_metadata主题中,由 Controller 节点(通过选举产生)统一管理。
  • 日志复制机制:所有 Broker 作为 Raft 协议的 Follower,实时复制 Controller 的元数据日志,确保强一致性。
  • 快照与恢复:定期生成元数据快照,避免日志无限增长,故障恢复时间从 ZK 时代的分钟级优化至秒级。

Kafka 4.0.0集群安装步骤

一、环境准备

1. 系统要求

  • 准备搭建Kafka集群的三台机器(建议配置:8GB RAM, 2 核 CPU, 50GB 磁盘)。
  • 操作系统: 选择CentOS 8(UBuntu 20.x也可以)。
  • Java:已安装JDK17,并配置好环境变量。注意,在 Kafka 4.0 中,Kafka 客户端和 Kafka Streams 需要 Java 11,而 Kafka Broker、Controller、Connect、Tools 等服务端组件现在需要 Java 17 运行。
  • 防火墙:开放 9092(Broker)和 9093(Controller)端口。

2. 主机名配置(示例)

编辑 /etc/hosts,添加以下内容(三台机器均需配置):

	192.168.174.101 worker1
	192.168.174.102 worker2
	192.168.174.103 worker3

3. 用户名:hduser

二、下载并解压 Kafka 4.0.0

以下操作在所有节点机器上执行。

1. 下载Kafka 4.0.0

$ wget https://downloads.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz

2. 解压到~/bigdata/目录下

$ tar -xzf kafka_2.13-4.0.0.tgz -C /home/hduser/bigdata/

3. 添加环境变量

打开/etc/profile文件:

$ sudo nano /etc/profile

添加如下内容:

export KAFKA_HOME=/home/hduser/bigdata/kafka_2.13-4.0.0
export PATH=$PATH:$KAFKA_HOME/bin

保存内容,并关闭文件。

4. 执行如下命令,使配置生效:

$ source /etc/profile

三、配置 KRaft 集群

1. 配置各节点的 server.properties 属性文件。

各节点的server.properties文件位于$KAFKA_HOME/config/目录下。

节点worker1(192.168.174.101)的配置如下(只列出修改内容):

	###  Server Basic配置 ###

	# 这个服务器所扮演的角色,用来设置 KRaft模式
	process.roles=broker,controller

	# 与这个实例角色相关联的节点id(每个节点必须是唯一的)
	node.id=1

	# 初始连接点(至少3个节点,确保法定人数),存储格式化时使用的初始Controller节点
	controller.quorum.bootstrap.servers=worker1:9093,worker2:9093,worker3:9093

	# 运行时的Controller集群完整列表(包含所有节点)
	controller.quorum.voters=1@worker1:9093,2@worker2:9093,3@worker3:9093

	###  Socket Server Settings 配置###

	# socket服务器监听的地址
	listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093

	# 用户节点节通信的监听器名称
	inter.broker.listener.name=PLAINTEXT

	# 建议客户端连接的监听器的名称,broker的主机名和端口号
	# 如果没有设置,它使用 "listeners" 的值
	advertised.listeners=PLAINTEXT://worker1:9092,CONTROLLER://worker1:9093

	# controller所使用的监听器的名称,以KRaft模式运行时必须
	controller.listener.names=CONTROLLER

	### Log Basics ###
	
	# 用逗号分隔的目录列表,在这些目录下存储日志文件
	log.dirs=/home/hduser/bigdata/kafka_2.13-4.0.0/kraft-combined-logs

节点worker2(192.168.174.102)的配置如下(仅node.id和advertised.listeners不同):

	###  Server Basic配置 ###

	# 这个服务器所扮演的角色,用来设置 KRaft模式
	process.roles=broker,controller

	# 与这个实例角色相关联的节点id(每个节点必须是唯一的)
	node.id=2

	# 初始连接点(至少3个节点,确保法定人数),存储格式化时使用的初始Controller节点
	controller.quorum.bootstrap.servers=worker1:9093,worker2:9093,worker3:9093

	# 运行时的Controller集群完整列表(包含所有节点)
	controller.quorum.voters=1@worker1:9093,2@worker2:9093,3@worker3:9093

	###  Socket Server Settings 配置###

	# socket服务器监听的地址
	listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093

	# 用户节点节通信的监听器名称
	inter.broker.listener.name=PLAINTEXT

	# 建议客户端连接的监听器的名称,broker的主机名和端口号
	# 如果没有设置,它使用 "listeners" 的值
	advertised.listeners=PLAINTEXT://worker2:9092,CONTROLLER://worker2:9093

	# controller所使用的监听器的名称,以KRaft模式运行时必须
	controller.listener.names=CONTROLLER

	### Log Basics ###
	
	# 用逗号分隔的目录列表,在这些目录下存储日志文件
	log.dirs=/home/hduser/bigdata/kafka_2.13-4.0.0/kraft-combined-logs 

节点worker3(192.168.174.103)的配置如下(仅node.id和advertised.listeners不同):

	###  Server Basic配置 ###

	# 这个服务器所扮演的角色,用来设置 KRaft模式
	process.roles=broker,controller

	# 与这个实例角色相关联的节点id(每个节点必须是唯一的)
	node.id=3

	# 初始连接点(至少3个节点,确保法定人数),存储格式化时使用的初始Controller节点
	controller.quorum.bootstrap.servers=worker1:9093,worker2:9093,worker3:9093

	# 运行时的Controller集群完整列表(包含所有节点)
	controller.quorum.voters=1@worker1:9093,2@worker2:9093,3@worker3:9093

	###  Socket Server Settings 配置###

	# socket服务器监听的地址
	listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093

	# 用户节点节通信的监听器名称
	inter.broker.listener.name=PLAINTEXT

	# 建议客户端连接的监听器的名称,broker的主机名和端口号
	# 如果没有设置,它使用 "listeners" 的值
	advertised.listeners=PLAINTEXT://worker3:9092,CONTROLLER://worker3:9093

	# controller所使用的监听器的名称,以KRaft模式运行时必须
	controller.listener.names=CONTROLLER

	### Log Basics ###
	
	# 用逗号分隔的目录列表,在这些目录下存储日志文件
	log.dirs=/home/hduser/bigdata/kafka_2.13-4.0.0/kraft-combined-logs 

2. 创建存储目录(所有机器)

创建用来存储日志文件的目录,与配置中的log.dir值相同。命令如下:

$ mkdir -p /home/hduser/bigdata/kafka_2.13-4.0.0/kraft-combined-logs

四、格式化存储(所有机器)

1. 生成 Cluster ID

生成一个唯一的集群ID。任选一台机器执行(执行一次即可),执行如下命令:

$ cd /home/hduser/bigdata/kafka_2.13-4.0.0
$ KAFKA_CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
$ echo $KAFKA_CLUSTER_ID 

kafka-storage.sh random-uuid 的核心功能是生成一个符合 RFC 4122 的 UUID。可以使用其他工具(如 Python、uuidgen 命令)手动生成 UUID,然后直接替换。 例如,也可以使用Linux系统自带的uuidgen命令。大多数 Linux 系统预装了 uuidgen 工具,直接生成 UUID:

# 生成 UUID
$ KAFKA_CLUSTER_ID=$(uuidgen)

保存此ID,后续所有机器配置使用。

2. 格式化存储目录(所有节点)

依次在每台机器上执行如下命令,以格式化每个节点上的存储目录:

# kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/server.properties   
# 替换为上一步生成的Cluster ID
$ kafka-storage.sh format -t 18e07de8-03b2-44d8-b6ce-822200c62183 -c  $KAFKA_HOME/config/server.properties

五、启动Kafka集群

在所有机器上启动Kafka服务器。依次在每台机器上执行如下命令:

$ kafka-server-start.sh $KAFKA_HOME/config/server.properties

然后使用jsp命令查看是否有名为"kafka"的进程。

六、验证集群

1. 创建、查看和删除Topic

# 创建2分区、3副本的主题,主题名为test:
$ kafka-topics.sh --create   --bootstrap-server worker1:9092  --topic test   --partitions 2  --replication-factor 3

# 查看主题:
$ kafka-topics.sh --bootstrap-server worker1:9092 --list

# 查看主题详情
$ kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic test

# 查看分区分布:
$ kafka-topics.sh --bootstrap-server worker1:9092 --describe 

# 删除Topic
$ kafka-topics.sh --bootstrap-server worker1:9092 --delete --topic test

2. 测试生产者和消费者

先启动消费者进程:

# 启动控制台消费者
$ kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server worker1:9092

再启动生产者进程:

# 启动控制台生产者 
$ kafka-console-producer.sh --topic test --bootstrap-server worker1:9092

这时在控制台生产者窗口随意输入一些文本,按回车键发送。在控制台消费者窗口应该马上能看到收到的文本消息。

通过以上步骤,我们成功搭建一个三节点的 Kafka 4.0.0 KRaft 集群。