Apache Kafka 自带了一个简化版的 Apache ZooKeeper,用于管理 Kafka 集群的元数据和状态信息。ZooKeeper 是一个分布式协调服务,广泛用于分布式系统中的配置管理、命名服务、分布式锁等场景。
在 Kafka 中,ZooKeeper 扮演着以下几个重要角色:
配置存储:Kafka 使用 ZooKeeper 存储集群的配置信息,包括主题(topic)和分区(partition)的分配、消费者组(consumer group)的偏移量(offset)等。这些配置信息被保存在 ZooKeeper 的数据节点(ZNode)中。
选举协调:Kafka 的每个分区都由一个 broker 负责作为其 leader,并有零个或多个 follower。ZooKeeper 用于协调 broker 之间的选举过程,确保每个分区都有可用的 leader。
偏移量管理:消费者可以通过 ZooKeeper 来存储和获取其在每个分区上的消费偏移量。ZooKeeper 跟踪每个消费者组的偏移量,并允许消费者在重新加入时从上次停止的位置继续消费。
需要注意的是,自带的 ZooKeeper 是用于管理 Kafka 内部的元数据和状态,而不是用于其他用途的通用 ZooKeeper 集群。如果需要在其他应用程序中使用 ZooKeeper,通常建议单独部署一个独立的 ZooKeeper 集群。
从 Kafka 2.8.0 版本开始,Kafka 引入了一种新的集群协调机制,称为 Kafka Raft Metadata Mode,它逐渐取代了依赖 ZooKeeper 的元数据管理。在新版本中,ZooKeeper 的依赖性将逐渐减弱,未来版本可能完全去除对 ZooKeeper 的依赖。
环境:
服务器:一台CentOS7.9
Kafka:kafka_2.13-2.7.1,其中依赖的zk版本为:zookeeper: "3.5.9"
ZK启动脚本
zk_node1_startup.sh
#!/bin/bash
exec nohup /opt/kafka_node1/bin/zookeeper-server-start.sh /opt/kafka_node1/config/zookeeper.properties > ./zk_node1.out 2>&1 &tail -f ./zk_node1.out
zk_node2_startup.sh
#!/bin/bash
exec nohup /opt/kafka_node2/bin/zookeeper-server-start.sh /opt/kafka_node2/config/zookeeper.properties > ./zk_node2.out 2>&1 &tail -f ./zk_node2.out
zk_node3_startup.sh
#!/bin/bash
exec nohup /opt/kafka_node3/bin/zookeeper-server-start.sh /opt/kafka_node3/config/zookeeper.properties > ./zk_node3.out 2>&1 &tail -f ./zk_node3.out
Kafka启动脚本
kafka_node1_startup.sh
#!/bin/bash
exec nohup /opt/kafka_node1/bin/kafka-server-start.sh /opt/kafka_node1/config/server.properties > ./kafka_node1.out 2>&1 &tail -f ./kafka_node1.out
kafka_node2_startup.sh
#!/bin/bash
exec nohup /opt/kafka_node2/bin/kafka-server-start.sh /opt/kafka_node2/config/server.properties > ./kafka_node2.out 2>&1 &tail -f ./kafka_node2.out
kafka_node3_startup.sh
#!/bin/bash
exec nohup /opt/kafka_node3/bin/kafka-server-start.sh /opt/kafka_node3/config/server.properties > ./kafka_node3.out 2>&1 &tail -f ./kafka_node3.out
SpringBoot集成Kafka
配置文件
spring:kafka:bootstrap-servers: IP:9192,IP:9292,IP:9392producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:max.block.ms: 8000consumer:group-id: default-groupIdkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer