这里写目录标题
- 1. 创建主题
- 1). 自动创建;(不建议)
- 2. 通过kafka-topics.sh 创建:
- 3. 通过 TopicCommand 创建:
- 2. 查看主题
- 2. 其他操作
1. 创建主题
1). 自动创建;(不建议)
auto.create.topics.enable设置为true(默认值就是true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions (默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。
一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数num.partitions和default.replication.factor的值来创建一个相应的主题
2. 通过kafka-topics.sh 创建:
创建一个名为“topic_demo_01 ”,10分区,2副本存储的主题
bin/kafka-topics.sh --bootstrap-server 10.50.4.01:9392 --create --topic topic_demo_01 --partitions 10 --replication-factor 2
注: 收到的提示:
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
因为主题中含有下划线会和’.'冲突,Kafka的内部做埋点时会根据主题的名称来命名metrics的名称,并且会将点号“.”改成下画线“_”。假设遇到一个名称为“topic.1_2”的主题,还有一个名称为“topic_1.2”的主题,那么最后的metrics的名称都会为“topic_1_2”,这样就发生了名称冲突。
参数说明:
- –bootstrap-server 10.50.4.01:9392:
指定用于连接的引导服务器(bootstrap server)后面跟着kafka broker列表,具体可以从kafka集群配置文件server.properties里的监听端口。如果指定多个用逗号分割。
注:在以前的版本会指定zookeeper连接字符串,如–zookeeper 10.50.4.01:2181 - –create
表示创建命令 - –topic topic_demo_01
指定topic名称为:topic_demo_01 - –partitions 10
指定分区数为10,也就是这个topic会创建10个分区来存储数据。 - –replication-factor 2
表示主题数据会存两副本,既一主一从。
kafka topic底层存储:
通过kafka的配置文件server.properties中的“log.dirs=/data/kafka/data”可以找到kafka topic数据持久化存储的地方可以看到主题创建的10个分区文件夹。
集群共三个broker,我们创建了10分区,两副本,因此共产生20个文件夹,分散到三个broker中,如下图:
打开其中一个文件夹里面包含4个文件:
**.index文件:**这些文件包含了一个偏移量(offset)和物理文件中一个特定位置的映射。它们被用于快速定位消息的位置。
**.log文件:**这些文件实际存储了Kafka的消息数据。每个主题的每个分区都会有一个对应的.log文件。
**.timeindex文件:**这些文件包含了一个时间戳和物理文件中一个特定位置的映射。它们被用于根据时间戳快速定位消息的位置。
leader-epoch-checkpoint
是Kafka 中用于维护副本管理的重要组成部分,它记录了每个副本的 leader epoch(领导者周期)。这个文件存储在每个副本对应的日志目录下。
当 Kafka 副本因 Leader 选举而改变身份时,比如从 Follower 变为 Leader 或从 Leader 变为 Follower,它都会更新这个文件。这个文件的主要目的是确保副本能够准确地重建其 Leader Epoch 状态,特别是在副本重启或者 Leader 更换时。
主题、分区、日志的关系:
通过 describe 命令查看细节:
通过describe指令类型来查看分区副本的分配细节
bin/kafka-topics.sh --bootstrap-server 10.53.4.27:9392 --describe --topic topic_demo_01
返回信息:
Topic:topic_demo_01 PartitionCount:10 ReplicationFactor:2 Configs:cleanup.policy=delete,flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=52428800,retention.bytes=80530636800Topic: topic_demo_01 Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2Topic: topic_demo_01 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: topic_demo_01 Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0Topic: topic_demo_01 Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0Topic: topic_demo_01 Partition: 4 Leader: 0 Replicas: 0,2 Isr: 0,2Topic: topic_demo_01 Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: topic_demo_01 Partition: 6 Leader: 1 Replicas: 1,2 Isr: 1,2Topic: topic_demo_01 Partition: 7 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: topic_demo_01 Partition: 8 Leader: 2 Replicas: 2,0 Isr: 2,0Topic: topic_demo_01 Partition: 9 Leader: 1 Replicas: 1,0 Isr: 1,0
Leader表示分区的leader副本所对应的brokerId,Isr表示分区的ISR集合,Replicas表示分区的所有的副本分配情况,即AR集合,其中的数字都表示的是brokerId。
replica-assignment
以上创建主题时的分区副本都是按照既定的内部逻辑来进行分配的,也可以利用replica-assignment进行,一般不用。
3. 通过 TopicCommand 创建:
kafka-topics.sh 脚本实质上是调用了kafka.admin.TopicCommand 类
添加Maven依赖:
调用TopicCommand 创建,本质与kafka-topics.sh创建没有区别
2. 查看主题
- 查看多个主题:
上文利用describe查看单个主题信息,也可以传入多个主题,用逗号分割
如:
bin/kafka-topics.sh --bootstrap-server 10.53.4.27:9392 --describe --topic topic_demo_01,topic_demo_02
- 列出包含了与集群不一样配置的主题:
bin/kafka-topics.sh --bootstrap-server 10.53.4.27:9392 --describe --topics-with-overrides
- 找出所有包含失效副本的分区
包含失效副本的分区可能正在进行同步操作,也有可能同步发生异常,此时分区的ISR集合小于AR 集合。对于通过该参数查询到的分区要重点监控,因为这很可能意味着集群中的某个broker已经失效或同步效率降低等。
bin/kafka-topics.sh --bootstrap-server 10.53.4.27:9392 --describe --under-replicated-partitions
- 查看主题中没有 leader 副本的分区
这些分区已经处于离线状态,对于外界的生产者和消费者来说处于不可用的状态。
bin/kafka-topics.sh --bootstrap-server 10.53.4.27:9392 --describe --unavailable-partitions
2. 其他操作
-
修改主题
kafka-topics.sh 脚本中的alter指令提供。可以修改分区数和配置。分区数仅支持新增,不支持修改。当新增分区数时,消息如果含有key,会重新计算分区号,对消息重新分配,会打乱原有的顺序。因此对分区的修改要慎重,建议初始时设计好分区。 -
删除主题
kafka-topics.sh脚本中的delete指令就可以用来删除主题,
tips:broker端配置参数delete.topic.enable 有关。必须将delete.topic.enable参数配置为true才能够删除主题,这个参数的默认值就是true,如果配置为false,那么删除主题的操作将会被忽略。如果要删除的主题是 Kafka 的内部主题,那么删除时就会报错。