Kafka ConsumerConfig
中的配置项用于定义消费者的行为,如消费方式、偏移管理、组协调等。以下是ConsumerConfig
中的关键配置项及其详细说明:
1. bootstrap.servers
- 类型:
List<String>
- 说明:Kafka集群的地址列表,消费者会从这些地址拉取数据,通常设置多个地址以防单个节点故障。
2. group.id
- 类型:
String
- 说明:消费者组的ID。属于同一个组的消费者会协同消费一个或多个分区内的消息,每个消息只会被一个组内的消费者读取。
3. enable.auto.commit
- 类型:
Boolean
- 默认值:
true
- 说明:是否自动提交偏移量。如果为
true
,则消费者会在每次轮询后自动提交当前的偏移量。
4. auto.commit.interval.ms
- 类型:
Integer
- 默认值:5000
- 说明:自动提交偏移量的时间间隔,单位为毫秒。仅在
enable.auto.commit=true
时生效。
5. auto.offset.reset
- 类型:
String
- 默认值:
latest
- 说明:消费者在无法找到其偏移量时的行为。可选值包括:
latest
:从最新的数据开始消费(默认)。earliest
:从最早的数据开始消费。none
:如果没有偏移量,抛出异常。
6. fetch.min.bytes
- 类型:
Integer
- 默认值:1
- 说明:消费者从服务器拉取数据时的最小字节数,只有达到这个值时,Kafka才会返回数据。提高此值可以减少请求频率,但会增加延迟。
7. fetch.max.wait.ms
- 类型:
Integer
- 默认值:500
- 说明:消费者等待数据的最大时间。如果没有足够的数据满足
fetch.min.bytes
,Kafka会等待这个时间后返回数据。
8. max.poll.records
- 类型:
Integer
- 默认值:500
- 说明:单次轮询中可以拉取的最大消息数。调小此值可以减轻消费者的负担,但会增加拉取频率。
9. session.timeout.ms
- 类型:
Integer
- 默认值:10000
- 说明:消费者与协调器保持连接的超时时间,如果消费者在该时间内未向协调器发送心跳,协调器会认为该消费者已离开并进行再平衡。
10. heartbeat.interval.ms
- 类型:
Integer
- 默认值:3000
- 说明:消费者发送心跳的频率。应小于
session.timeout.ms
,用于维持消费者的活跃状态。
11. max.poll.interval.ms
- 类型:
Integer
- 默认值:300000(5分钟)
- 说明:消费者两次调用
poll
方法之间的最大允许时间。如果超时,则消费者会被认为无响应并触发再平衡。
12. isolation.level
- 类型:
String
- 默认值:
read_uncommitted
- 说明:控制事务消费的隔离级别。可选值:
read_uncommitted
:读取所有消息(包括未提交的事务消息)。read_committed
:只读取已提交的消息。
13. client.id
- 类型:
String
- 说明:客户端ID,用于在监控和日志中识别客户端。
14. receive.buffer.bytes
/ send.buffer.bytes
- 类型:
Integer
- 默认值:65536(64KB) / 131072(128KB)
- 说明:TCP接收和发送缓冲区大小。适当调整这些值可以提高网络吞吐量。
15. max.partition.fetch.bytes
- 类型:
Integer
- 默认值:1048576(1MB)
- 说明:消费者单次从每个分区拉取的最大数据量。这个值越大,单次拉取的数据就越多。
16. connections.max.idle.ms
- 类型:
Long
- 默认值:540000
- 说明:客户端与服务器之间连接的最大空闲时间。超过该时间,Kafka会关闭空闲连接以释放资源。
17. request.timeout.ms
- 类型:
Integer
- 默认值:30000(30秒)
- 说明:消费者请求的超时时间。在该时间内如果没有响应,则消费者会认为请求超时。
18. metrics.recording.level
- 类型:
String
- 默认值:
INFO
- 说明:控制消费者的度量记录级别。可选值为
INFO
(记录核心指标)和DEBUG
(记录更多指标)。
19. metric.reporters
- 类型:
List<String>
- 说明:指定度量报告器类的列表,可以将消费者的度量数据导出到外部系统,例如Prometheus或自定义的监控系统。
20. check.crcs
- 类型:
Boolean
- 默认值:
true
- 说明:是否验证消息的CRC(校验和)。开启后可以确保消息完整性,但会稍微增加性能开销。
21. interceptor.classes
- 类型:
List<String>
- 说明:拦截器类的列表,允许在消息被消费前或消费后进行拦截和处理,适合监控、改写消息等场景。
22. partition.assignment.strategy
- 类型:
List<String>
- 默认值:
RangeAssignor
- 说明:消费者组的分区分配策略。可选值包括:
RangeAssignor
:按分区范围进行分配。RoundRobinAssignor
:轮询分配。StickyAssignor
:优先保持消费者分配的稳定性。CooperativeStickyAssignor
:部分分配,再平衡时减少中断。适合大数据量的分区。
23. fetch.max.bytes
- 类型:
Integer
- 默认值:52428800(50MB)
- 说明:消费者单次拉取的最大数据量,用于控制批量处理的数据量上限。
24. reconnect.backoff.ms
/ reconnect.backoff.max.ms
- 类型:
Long
- 默认值:50 / 1000
- 说明:客户端连接失败后的重试时间间隔。backoff时间会随着重试次数指数增加,但不会超过
reconnect.backoff.max.ms
。 - 规则:org.apache.kafka.common.utils.ExponentialBackoff
25. retry.backoff.ms
- 类型:
Long
- 默认值:100
- 说明:请求失败后的重试间隔时间。客户端会在该时间后进行重试。
这些配置项可以帮助用户精细化控制Kafka消费者的行为,包括消费者组、消息拉取、分区分配、超时设置等。根据需求合理配置,可以优化消费者的性能和稳定性。