文章目录
- 1.前言
- 2.常用参数
- 2.1 broker 参数
- 2.1.1 log.dirs
- 2.1.2 ZooKeeper 集群
- 2.1.3 数据留存(全局级别)
- 2.2 topic 参数
- 2.2.1 数据留存(topic级别)
- 2.2.2 auto.create.topics.enable
- 2.3 jvm 参数
- 2.4 动态参数
- 2.4.1 概述
- 2.4.2 常用的动态参数
- 3.项目示例
- 3.1 基础配置
- 3.2 主题与日志配置
- 3.3 线程与性能配置
- 3.4 JVM 配置
- 3.5 JMX 配置
- 3.6 其他配置
- 3.7 总结
1.前言
本章我们来讨论下 kafka 中一些重要的参数配置;列举实际项目中 kafka borker 的参数配置,并解释每个参数的意义。
2.常用参数
kafka 官网所有配置参数链接
- https://kafka.apache.org/documentation/#configuration
下面我们只列举一些常用的参数
2.1 broker 参数
2.1.1 log.dirs
- log.dirs:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。
- 比如 log.dirs=/kafka
- log.dir:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。
2.1.2 ZooKeeper 集群
多个 Kafka 集群使用同一套 ZooKeeper 集群
正确的参数:
- kafka集群1:zk1:2181,zk2:2181,zk3:2181/kafka1
- kafka集群2:zk1:2181,zk2:2181,zk3:2181/kafka2
2.1.3 数据留存(全局级别)
- log.retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。
- log.retention.hours=72 #3天 默认7天
- log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。
- message.max.bytes:控制 Broker 能够接收的最大消息大小。
- message.max.bytes=83886080
2.2 topic 参数
2.2.1 数据留存(topic级别)
下面是一个创建 topic 的命令
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
- retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
- retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
2.2.2 auto.create.topics.enable
- 是否允许自动创建 Topic
- true 表示允许
- false 表示禁止,一般来说如果 kafka 对外,最好是采用 false
- 当生产者给 kafka topic 发送 message 时,如果 topic 不存在,会自动创建这个 topic
2.3 jvm 参数
在启动 Kafka Broker 之前,先设置上这两个环境变量:
堆(heap)默认是 1g
$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
- KAFKA_HEAP_OPTS:指定堆大小。
- KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。
- kafka 是由 scala 语言编写,编译成 class 文件,在 JVM 中运行;所以可以使用 JVM 的参数配置,这点和 Java 项目一样。
2.4 动态参数
2.4.1 概述
官网中的每个参数都有 Update Mode 这个属性
Update Mode
- read-only。被标记为 read-only 的参数和原来的参数行为一样,只有重启 Broker,才能令修改生效。
- per-broker。被标记为 per-broker 的参数属于动态参数,修改它之后,只会在对应的 Broker 上生效。
- cluster-wide。被标记为 cluster-wide 的参数也属于动态参数,修改它之后,会在整个集群范围内生效,也就是说,对所有 Broker 都生效。你也可以为具体的 Broker 修改 cluster-wide 参数。
2.4.2 常用的动态参数
- log.retention.ms。修改日志留存时间应该算是一个比较高频的操作,毕竟,我们不可能完美地预估所有业务的消息留存时长。虽然该参数有对应的主题级别参数可以设置,但拥有在全局层面上动态变更的能力,依然是一个很好的功能亮点。
- num.io.threads 和 num.network.threads。这是我们在前面提到的两组线程池。就我个人而言,我觉得这是动态 Broker 参数最实用的场景了。毕竟,在实际生产环境中,Broker 端请求处理能力经常要按需扩容。如果没有动态 Broker 参数,我们是无法做到这一点的。
- SSL 相关的参数。主要是 4 个参数(ssl.keystore.type、ssl.keystore.location、ssl.keystore.password 和 ssl.key.password)。允许动态实时调整它们之后,我们就能创建那些过期时间很短的 SSL 证书。每当我们调整时,Kafka 底层会重新配置 Socket 连接通道并更新 Keystore。新的连接会使用新的 Keystore,阶段性地调整这组参数,有利于增加安全性。
- num.replica.fetchers。这也是我认为的最实用的动态 Broker 参数之一。Follower 副本拉取速度慢,在线上 Kafka 环境中一直是一个老大难的问题。针对这个问题,常见的做法是增加该参数值,确保有充足的线程可以执行 Follower 副本向 Leader 副本的拉取。现在有了动态参数,你不需要再重启 Broker,就能立即在 Follower 端生效,因此我说这是很实用的应用场景。
3.项目示例
以下是一个实际项目中运行的 kafka docker 镜像中配置的环境变量参数,其中实际服务器的 HostName 换成了 127.0.0.1
- 这些环境变量(大写),在 docker 启动的过程中,会配置到 kafka 对应的参数中进行启动
ALLOW_PLAINTEXT_LISTENER='yes'
KAFKA_CFG_LISTENERS=PLAINTEXT://:23310
KAFKA_CFG_ADVERTISED_HOST_NAME=127.0.0.1
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:16712
KAFKA_CFG_ZOOKEEPER_CONNECT=127.0.0.1:23010/kafka
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR='1'
KAFKA_BROKER_ID=1
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='false'
KAFKA_CFG_LOG_CLEANUP_POLICY='compact'
KAFKA_CFG_LOG_CLEANER_MIN_COMPACTION_LAG_MS='604800000'
KAFKA_CFG_NUM_IO_THREADS='6'
KAFKA_CFG_NUM_NETWORK_THREADS='20'
KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR='3'
KAFKA_HEAP_OPTS='-Xms6144m -Xmx6144m -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80'
JMX_PORT='23311'
KAFKA_JMX_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=23311'
PORT_MAPPING=16712:23310
3.1 基础配置
ALLOW_PLAINTEXT_LISTENER='yes'
- 允许 Kafka 使用明文(未加密)的监听器。
- 如果设置为
yes
,Kafka 会支持PLAINTEXT
协议的监听器。
KAFKA_CFG_LISTENERS=PLAINTEXT://:23310
- 定义 Kafka 监听的地址和端口。
- 这里使用
PLAINTEXT
协议,监听所有网络接口(:
表示所有 IP)的23310
端口。
KAFKA_CFG_ADVERTISED_HOST_NAME=127.0.0.1
- 定义 Kafka 对外发布的主机名。
- 客户端连接 Kafka 时,会使用这个主机名。
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:16712
- 定义 Kafka 对外发布的监听地址。
- 客户端连接 Kafka 时,会使用这个地址和端口(
16712
)。
KAFKA_CFG_ZOOKEEPER_CONNECT=127.0.0.1:23010/kafka
- 定义 Kafka 连接的 ZooKeeper 地址。
- 这里连接到
127.0.0.1:23010
,并使用/kafka
作为 ZooKeeper 的根路径。
KAFKA_BROKER_ID=1
- 定义 Kafka Broker 的唯一标识符。
- 每个 Kafka Broker 必须有一个唯一的 ID。
3.2 主题与日志配置
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR='1'
- 定义 Kafka 内部
__consumer_offsets
主题的副本数量。 - 这里设置为
1
,表示该主题只有一个副本。
- 定义 Kafka 内部
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='false'
- 控制是否自动创建主题。
- 如果设置为
false
,当生产者或消费者尝试访问不存在的主题时,Kafka 不会自动创建该主题。
KAFKA_CFG_LOG_CLEANUP_POLICY='compact'
- 定义 Kafka 日志的清理策略。
compact
表示日志压缩策略,只保留每个键的最新值。
KAFKA_CFG_LOG_CLEANER_MIN_COMPACTION_LAG_MS='604800000'
- 定义日志压缩的最小延迟时间(毫秒)。
- 这里设置为
604800000
毫秒(7 天),表示日志在写入后至少 7 天才会被压缩。
3.3 线程与性能配置
KAFKA_CFG_NUM_IO_THREADS='6'
- 定义 Kafka 处理 I/O 操作的线程数量。
- 增加线程数可以提高 I/O 性能。
KAFKA_CFG_NUM_NETWORK_THREADS='20'
- 定义 Kafka 处理网络请求的线程数量。
- 增加线程数可以提高网络吞吐量。
KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR='3'
- 定义每个数据目录的恢复线程数量。
- 增加线程数可以加快日志恢复速度。
3.4 JVM 配置
KAFKA_HEAP_OPTS='-Xms6144m -Xmx6144m -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80'
- 定义 Kafka JVM 的堆内存和垃圾回收配置。
-Xms6144m -Xmx6144m
:设置 JVM 堆内存的初始和最大大小为 6GB。-XX:MetaspaceSize=96m
:设置 Metaspace 的初始大小为 96MB。-XX:+UseG1GC
:使用 G1 垃圾回收器。-XX:MaxGCPauseMillis=20
:设置垃圾回收的最大暂停时间为 20 毫秒。-XX:InitiatingHeapOccupancyPercent=35
:当堆使用率达到 35% 时,启动垃圾回收。-XX:G1HeapRegionSize=16M
:设置 G1 的堆区域大小为 16MB。-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
:控制 Metaspace 的空闲比例。
3.5 JMX 配置
JMX_PORT='23311'
- 定义 Kafka JMX 监控的端口。
- 这里设置为
23311
。
KAFKA_JMX_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=23311'
- 定义 Kafka JMX 的配置。
-Dcom.sun.management.jmxremote
:启用 JMX 远程监控。-Dcom.sun.management.jmxremote.authenticate=false
:禁用 JMX 认证。-Dcom.sun.management.jmxremote.ssl=false
:禁用 JMX SSL。-Djava.rmi.server.hostname=127.0.0.1
:设置 JMX 的 RMI 主机名。-Dcom.sun.management.jmxremote.rmi.port=23311
:设置 JMX 的 RMI 端口。
3.6 其他配置
PORT_MAPPING=16712:23310
- 定义端口映射关系。
- 这里将外部端口
16712
映射到内部端口23310
。
3.7 总结
这些参数共同配置了 Kafka 的网络、性能、日志、JVM 和监控行为。通过合理调整这些参数,可以优化 Kafka 的性能、稳定性和可观测性。