目录
- 初识Kafka
- 基本概念
- 安装与配置
- ZooKeeper安装与配置
- Kafka的安装与配置
- 生产与消费
- 服务端参数配置
- 生产者
- 客户端开发
- 消息对象 ProducerRecord
- 必要的参数配置
- 发送消息
- 序列化
- 分区器
- 生产者拦截器
- 原理分析
- 整体架构
- 元数据的更新
- 重要的生产者参数
- acks
- max.request.size
- retries和retry.backoff.ms
- compression.type
- connections.max.idle.ms
- linger.ms
- receive.buffer.bytes
- send.buffer.bytes
- request.timeout.ms
- 消费者
- 消费者与消费组
- 客户端开发
- 必要的参数配置
- 订阅主题与分区
- 消息消费
- 位移提交
- 控制或关闭消费
- 指定位移消费
- 再均衡
- 消费者拦截器
- 多线程实现
- 重要的消费者参数
- fetch.min.bytes
- fetch.max.bytes
- fetch.max.wait.ms
- max.partition.fetch.bytes
- max.poll.records
- connections.max.idle.ms
- exclude.internal.topics
- receive.buffer.bytes
- send.buffer.bytes
- request.timeout.ms
- metadata.max.age.ms
- reconnect.backoff.ms
- retry.backoff.ms
- isolation.level
- 主题与分区
- 主题的管理
- 创建主题
- 分区副本的分配
- 查看主题
- 修改主题
- 配置管理
- 删除主题
- 初识KafkaAdminClient
- 基本使用
- 分区的管理
- 优先副本的选举
- 分区重分配
- 复制限流
- 修改副本因子
- 如何选择合适的分区数
- 日志存储
- 文件目录布局
- 日志格式的演变
- 消息压缩
- 变长字段
- v2版本
初识Kafka
Kafka之所以受到越来越多的青睐,与它所“扮演”的三大角色是分不开的:
- 消息系统:Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
- 存储系统:Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于Kafka 的消息持久化功能和多副本机制,我们可以把Kafka作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
- 流式处理平台:Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
基本概念
一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个ZooKeeper集群。其中ZooKeeper是Kafka用来负责集群元数据的管理、控制器的选举等操作的。Producer将消息发送到Broker,Broker负责将收到的消息存储到磁盘中,而Consumer负责从Broker订阅并消费消息。整个Kafka体系结构中引入了以下3个术语:
- Producer:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其投递到Kafka中。
- Consumer:消费者,也就是接收消息的一方。消费者连接到Kafka上并接收消息,进而进行相应的业务逻辑处理。
- Broker:服务代理节点。对于Kafka而言,Broker可以简单地看作一个独立的Kafka服务节点或Kafka服务实例。大多数情况下也可以将Broker看作一台Kafka服务器,前提是这台服务器上只部署了一个Kafka实例。一个或多个Broker组成了一个Kafka集群。一般而言,我们更习惯使用首字母小写的broker来表示服务代理节点。
在Kafka中还有两个特别重要的概念—**主题(Topic)与分区(Partition)**。Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。
主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。
Kafka中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个broker,以此来提供比单个broker更强大的性能。
每一条消息被发送到broker之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。
Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。Kafka通过多副本机制实现了故障的自动转移,当Kafka集群中某个broker失效时仍然能保证服务可用。
生产者和消费者只与leader副本进行交互,而follower副本只负责消息的同步,很多时候follower副本中的消息相对leader副本而言会有一定的滞后。
Kafka 消费端也具备一定的容灾能力。Consumer 使用拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失。
分区中的所有副本统称为AR(Assigned Replicas)。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。与leader副本同步滞后过多的副本(不包括leader副本)组成**OSR(Out-of-Sync Replicas)**,由此可见,AR=ISR+OSR。在正常情况下,所有的follower 副本都应该与 leader 副本保持一定程度的同步,即AR=ISR,OSR集合为空。
leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。如果OSR集合中有follower副本“追上”了leader副本,那么leader副本会把它从OSR集合转移至ISR集合。默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。
ISR与HW和LEO也有紧密的关系。HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。
LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset,LEO的大小相当于当前日志分区中最后一条消息的offset值加1。分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。(说白了就是没同步的消息还不允许被消费)
由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 副本都复制完,这条消息才会被确认为已成功提交,这种复制方式极大地影响了性能。而在异步复制方式下,follower副本异步地从leader副本中复制数据,数据只要被leader副本写入就被认为已经成功提交。在这种情况下,如果follower副本都还没有复制完而落后于leader副本,突然leader副本宕机,则会造成数据丢失。Kafka使用的这种ISR的方式则有效地权衡了数据可靠性和性能之间的关系。
安装与配置
ZooKeeper安装与配置
ZooKeeper是安装Kafka集群的必要组件,Kafka通过ZooKeeper来实施对元数据信息的管理,包括集群、broker、主题、分区等内容。(不过新版本kafka已经摆脱了zk的依赖,而是把元数据存储到了特定的topic里面)
Kafka的安装与配置
修改broker的配置文件$KAFKA_HOME/conf/server.properties。主要关注以下几个配置参数即可:
如果是单机模式,那么修改完上述配置参数之后就可以启动服务。如果是集群模式,那么只需要对单机模式的配置文件做相应的修改即可:确保集群中每个broker的broker.id配置参数的值不一样,以及listeners配置参数也需要修改为与broker对应的IP地址或域名,之后就可以各自启动服务。注意,在启动 Kafka 服务之前同样需要确保 zookeeper.connect参数所配置的ZooKeeper服务已经正确启动。
生产与消费
Kafka提供了许多实用的脚本工具,存放在$KAFKA_HOME的bin目录下,其中与主题有关的就是 kafka-topics.sh 脚本,下面我们用它演示创建一个分区数为 4、副本因子为 3 的主题topic-demo,示例如下:
其中–zookeeper指定了Kafka所连接的ZooKeeper服务地址,–topic指定了所要创建主题的名称,–replication-factor 指定了副本因子,–partitions 指定了分区个数,–create是创建主题的动作指令。
服务端参数配置
下面挑选一些重要的服务端参数来做细致的说明,这些参数都配置在$KAFKA_HOME/config/server.properties文件中。
- zookeeper.connect:该参数指明broker要连接的ZooKeeper集群的服务地址(包含端口号),没有默认值,且此参数为必填项。可以配置为localhost:2181,如果ZooKeeper集群中有多个节点,则可以用逗号将每个节点隔开,类似于 localhost1:2181,localhost2:2181,localhost3:2181这种格式。最佳的实践方式是再加一个chroot路径,这样既可以明确指明该chroot路径下的节点是为Kafka所用的,也可以实现多个Kafka集群复用一套ZooKeeper集群,这样可以节省更多的硬件资源。包含 chroot 路径的配置类似于 localhost1:2181,localhost2:2181,localhost3:2181/kafka这种,如果不指定chroot,那么默认使用ZooKeeper的根路径。
- listeners:该参数指明broker监听客户端连接的地址列表,即为客户端要连接broker的入口地址列表,配置格式为protocol1://hostname1:port1,protocol2://hostname2:port2,其中protocol代表协议类型,Kafka当前支持的协议类型有PLAINTEXT、SSL、SASL_SSL等,如果未开启安全认证,则使用简单的PLAINTEXT即可。hostname代表主机名,port代表服务端口,此参数的默认值为null
- broker.id:该参数用来指定Kafka集群中broker的唯一标识,默认值为-1。如果没有设置,那么Kafka会自动生成一个。这个参数还和meta.properties文件及服务端参数broker.id.generation.enable和reserved.broker.max.id有关
- log.dir和log.dirs:Kafka 把所有的消息都保存在磁盘上,而这两个参数用来配置Kafka 日志文件存放的根目录。一般情况下,log.dir 用来配置单个根目录,而 log.dirs 用来配置多个根目录(以逗号分隔),但是Kafka并没有对此做强制性限制,也就是说,log.dir和log.dirs都可以用来配置单个或多个根目录。log.dirs 的优先级比 log.dir 高,但是如果没有配置log.dirs,则会以 log.dir 配置为准。默认情况下只配置了 log.dir 参数,其默认值为/tmp/kafka-logs。
- message.max.bytes:该参数用来指定broker所能接收消息的最大值,默认值为1000012(B),约等于976.6KB。如果 Producer 发送的消息大于这个参数所设置的值,那么(Producer)就会报出RecordTooLargeException的异常。如果需要修改这个参数,那么还要考虑max.request.size (客户端参数)、max.message.bytes(topic端参数)等参数的影响。为了避免修改此参数而引起级联的影响,建议在修改此参数之前考虑分拆消息的可行性。
生产者
客户端开发
消息对象 ProducerRecord
这里有必要单独说明的是构建的消息对象 ProducerRecord,它并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务相关的消息体只是其中的一个 value 属性,比如“Hello,Kafka!”只是ProducerRecord对象中的一个属性。ProducerRecord类的定义如下(只截取成员变量):
其中topic和partition字段分别代表消息要发往的主题和分区号。headers字段是消息的头部,Kafka 0.11.x版本才引入这个属性,它大多用来设定一些与应用相关的信息,如无需要也可以不用设置。key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个key可以让消息再进行二次归类,同一个key的消息会被划分到同一个分区中,有key的消息还可以支持日志压缩的功能。value是指消息体,一般不为空,如果为空则表示特定的消息—墓碑消息。timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。
必要的参数配置
在Kafka生产者客户端KafkaProducer中有3个参数是必填的:
- bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka集群上。
- key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。key.serializer和value.serializer这两个参数分别用来指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,如org.apache.kafka.common.serialization.StringSerializer。
KafkaProducer中的参数众多,我们可以直接使用客户端中的org.apache.kafka.clients.producer.ProducerConfig类来做一定程度上的预防措施,每个参数在 ProducerConfig 类中都有对应的名称。
KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。
发送消息
创建生产者实例和构建消息之后,就可以开始发送消息了。发送消息主要有三种模式:**发后即忘(fire-and-forget)、同步(sync)及异步(async)**。
KafkaProducer 的 send()方法并非是 void 类型,而是 Future<RecordMetadata>类型,send()方法有2个重载方法,具体定义如下:
KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。retries参数的默认值为0。
同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条。
我们再来了解一下异步发送的方式,一般是在send()方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。有读者或许会有疑问,send()方法的返回值类型就是Future,而Future本身就可以用作异步的逻辑处理。这样做不是不行,只不过Future里的 get()方法在何时调用,以及怎么调用都是需要面对的问题,消息不停地发送,那么诸多消息对应的Future对象的处理难免会引起代码处理逻辑的混乱。使用Callback的方式非常简洁明了,Kafka有响应时就会回调,要么发送成功,要么抛出异常。
对于同一个分区而言,如果消息record1于record2之前先发送(参考上面的示例代码),那么KafkaProducer就可以保证对应的callback1在callback2之前调用,也就是说,回调函数的调用也可以保证分区有序。
通常,一个KafkaProducer不会只负责发送单条消息,更多的是发送多条消息,在发送完这些消息之后,需要调用KafkaProducer的close()方法来回收资源。close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer。与此同时,KafkaProducer还提供了一个带超时时间的close()方法。
序列化
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象(所以说如果生产者和消费者序列化方式不一样,就无法正确的解析消息)。
如果 Kafka 客户端提供的几种序列化器都无法满足应用需求,则可以选择使用如 Avro、JSON、Thrift、ProtoBuf和Protostuff等通用的序列化工具来实现,或者使用自定义类型的序列化器来实现。
分区器
消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。
如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值,分区器的作用就是为消息分配分区。
Kafka中提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口,如图所示:
其中partition()方法用来计算分区号,返回值为int类型。partition()方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。close()方法在关闭分区器的时候用来回收一些资源。
在默认分区器 DefaultPartitioner 的实现中,close()是空方法,而在 partition()方法中定义了主要的分区分配逻辑。如果key 不为 null,那么默认的分区器会对 key 进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。
在不改变主题分区数量的情况下,key与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了。
了使用 Kafka 提供的默认分区器进行分区分配,还可以使用自定义的分区器,只需同DefaultPartitioner一样实现Partitioner接口即可。默认的分区器在key为null时不会选择非可用的分区,我们可以通过自定义的分区器DemoPartitioner来打破这一限制。
生产者拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
生产者拦截器的使用也很方便,主要是自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3个方法。KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作,KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。
KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)。
在拦截链中,如果某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。(拦截器失败一个不影响其他的执行)
原理分析
整体架构
生产者客户端的整体架构图:
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器) 中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B,即 32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个 ProducerRecord。通俗地说,ProducerRecord 是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧凑。与此同时,将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch和消息的具体格式有关,如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。
消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。
Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成<Node,List<ProducerBatch>>的形式之后,Sender 还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest。
请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque<Request>的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
元数据的更新
InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于图中的InFlightRequests 来说,图中展示了三个节点Node0、Node1和Node2,很明显Node1的负载最小。也就是说,Node1为当前的leastLoadedNode。选择leastLoadedNode发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后KafkaProducer需要知道目标分区的leader副本所在的broker 节点的地址、端口等信息才能建立连接,最终才能将消息发送到 Kafka,在这一过程中所需要的信息都属于元数据信息。
我们了解了bootstrap.servers参数只需要配置部分broker节点的地址即可,不需要配置所有broker节点的地址,因为客户端可以自己发现其他broker节点的地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及leader副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。
元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms 时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障。
重要的生产者参数
下面挑选一些重要的参数进行讲解:
acks
这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks参数有3种类型的值(都是字符串类型)。
- acks=1。默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入leader副本,比如在leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息。acks设置为1,是消息可靠性和吞吐量之间的折中方案。
- acks=0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入Kafka的过程中出现某些异常,导致Kafka并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为 0 可以达到最大的吞吐量。
- acks=-1或acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为-1(all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks=1的情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动。
max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即 1MB。一般情况下,这个默认值就可以满足大多数的应用场景了。并不建议盲目地增大这个参数的配置值,尤其是在对Kafka整体脉络没有足够把控的时候。因为这个参数还涉及一些其他参数的联动,比如broker端的message.max.bytes参数,如果配置错误可能会引起一些不必要的异常。比如将broker端的message.max.bytes参数配置为10,而max.request.size参数配置为20,那么当我们发送一条大小为15B的消息时,生产者客户端就会报出异常。
retries和retry.backoff.ms
retries参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过max.request.size参数配置的值时,这种方式就不可行了。
重试还和另一个参数retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries 和 retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。
如果将acks参数配置为非零值,并且max.in.flight.requests.per.connection参数配置为大于1的值,那么就会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection配置为1,而不是把acks配置为0,不过这样也会影响整体的吞吐。
compression.type
这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。该参数还可以配置为“gzip”“snappy”和“lz4”。对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。
connections.max.idle.ms
这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。
linger.ms
这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。这个linger.ms参数与TCP协议中的Nagle算法有异曲同工之妙。
receive.buffer.bytes
这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为32768(B),即32KB。如果设置为-1,则使用操作系统的默认值。如果Producer与Kafka处于不同的机房,则可以适地调大这个参数值。
send.buffer.bytes
这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。
request.timeout.ms
这个参数用来配置Producer等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择进行重试。注意这个参数需要比broker端参数replica.lag.time.max.ms的值要大,这样可以减少因客户端重试而引起的消息重复的概率。
消费者
消费者与消费组
消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。每个消费者只能消费所分配到的分区中的消息。换言之,每一个分区只能被一个消费组中的一个消费者所消费。
消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少)消费者的个数来提高(或降低)整体的消费能力。对于分区数固定的情况,一味地增加消费者并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。(感觉这种设计纯粹是为了保证分区消息的有序性)。以上分配逻辑都是基于默认的分区分配策略进行分析的,可以通过消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。
消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数group.id来配置,默认值为空字符串。消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个线程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上,也可以部署在不同的机器上。
客户端开发
必要的参数配置
在Kafka消费者客户端KafkaConsumer中有4个参数是必填的:
- bootstrap.servers:该参数的释义和生产者客户端KafkaProducer 中的相同,用来 指 定 连 接 Kafka 集 群 所 需 的broker 地 址 清 单,具 体 内 容 形 式 为host1:port1,host2:post,可以设置一个或多个地址,中间用逗号隔开,此参数的默认值为“”。注意这里并非需要设置集群中全部的broker地址,消费者会从现有的配置中查找到全部的Kafka集群成员。这里设置两个以上的broker地址信息,当其中任意一个宕机时,消费者仍然可以连接到Kafka集群上
- group.id:消费者隶属的消费组的名称,默认值为“”。如果设置为空,则会报出异常:Exception in thread "main"org.apache.kafka.common.errors.InvalidGroupIdException:The configured groupId is invalid。一般而言,这个参数需要设置成具有一定的业务意义的名称。
- key.deserializer 和 value.deserializer:与生产者客户端KafkaProducer中的key.serializer和value.serializer参数对应。消费者从broker端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。这两个参数分别用来指定消息中key和value所需反序列化操作的反序列化器,这两个参数无默认值。注意这里必须填写反序列化器类的全限定名,比如示例中的org.apache.kafka.common.serialization.StringDeserializer,单单指定StringDeserializer是错误的
订阅主题与分区
在创建好消费者之后,我们就需要为该消费者订阅相关的主题了。一个消费者可以订阅一个或多个主题,我们可以使用subscribe()方法订阅了一个主题,对于这个方法而言,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。
对于消费者使用集合的方式(subscribe(Collection))来订阅主题而言,比较容易理解,订阅了什么主题就消费什么主题中的消息。如果前后两次订阅了不同的主题,那么消费者以最后一次的为准。
如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。在Kafka 和其他系统之间进行数据复制时,这种正则表达式的方式就显得很常见。(可以进行正则名称匹配)
消费者不仅可以通过KafkaConsumer.subscribe()方法订阅主题,还可以直接订阅某些主题的特定分区,在KafkaConsumer中还提供了一个assign()方法来实现这些功能。这个方法只接受一个参数partitions,用来指定需要订阅的分区集合。这里补充说明一下TopicPartition类,在Kafka的客户端中,它用来表示分区,这个类的部分内容如下所示:
TopicPartition类只有2个属性:topic和partition,分别代表分区所属的主题和自身的分区编号,这个类可以和我们通常所说的主题—分区的概念映射起来。KafkaConsumer 中的partitionsFor()方法可以用来查询指定主题的元数据信息。其中PartitionInfo类型即为主题的分区元数据信息,此类的主要结构如下:
PartitionInfo类中的属性topic表示主题名称,partition代表分区编号,leader代表分区的leader副本所在的位置,replicas代表分区的AR集合,inSyncReplicas代表分区的ISR集合,offlineReplicas代表分区的OSR集合。
既然有订阅,那么就有取消订阅,可以使用 KafkaConsumer 中的unsubscribe()方法来取消主题的订阅。这个方法既可以取消通过 subscribe(Collection)方式实现的订阅,也可以取消通过subscribe(Pattern)方式实现的订阅,还可以取消通过 assign(Collection)方式实现的订阅。注意,如果将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合,那么作用等同于unsubscribe()方法,因为新的订阅会覆盖旧的。
通过 subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过assign()方法订阅分区时,是不具备消费者自动均衡的功能的,其实这一点从assign()方法的参数中就可以看出端倪,两种类型的subscribe()都有ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。
消息消费
Kafka中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。
Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。对于poll()方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空;如果订阅的所有分区中都没有可供消费的消息,那么poll()方法返回为空的消息集合。
注意到poll()方法里还有一个超时时间参数timeout,用来控制poll()方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞。注意这里 timeout 的类型是 Duration,它是JDK8中新增的一个与时间有关的类型。timeout的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将timeout设置为0,这样poll()方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从Kafka中拉取并消费消息,则可以将这个参数设置为最大值Long.MAX_VALUE。
消费者消费到的每条消息的类型为ConsumerRecord(注意与ConsumerRecords的区别),这个和生产者发送的消息类型ProducerRecord相对应,不过ConsumerRecord中的内容更加丰富,具体的结构参考如下代码:
opic 和 partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。offset 表示消息在所属分区的偏移量。timestamp 表示时间戳,与此对应的timestampType 表示时间戳的类型。timestampType 有两种类型:CreateTime 和LogAppendTime,分别代表消息创建的时间戳和消息追加到日志的时间戳。headers表示消息的头部内容。key 和 value 分别表示消息的键和消息的值,一般业务应用要读取的就是value,checksum是CRC32的校验值。
poll()方法的返回值类型是 ConsumerRecords,它用来表示一次拉取操作所获得的消息集,内部包含了若干ConsumerRecord,它提供了一个iterator()方法来循环遍历消息集内部的消息。
ConsumerRecords类提供了一个records(TopicPartition)方法来获取消息集中指定分区的消息。ConsumerRecords 类中并没提供与 partitions()类似的 topics()方法来查看拉取的消息集中所包含的主题列表,如果要按照主题维度来进行消费,那么只能根据消费者订阅主题时的列表来进行逻辑处理了。
到目前为止,可以简单地认为poll()方法只是拉取一下消息而已,但就其内部逻辑而言并不简单,它涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容,在后面的章节中会循序渐进地介绍这些内容。
位移提交
对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。单词“offset”可以翻译为“偏移量”,也可以翻译为“位移”,读者可能并没有过多地在意这一点:在很多中文资料中都会交叉使用“偏移量”和“位移”这两个词,并没有很严谨地进行区分。笔者对offset做了一些区分:对于消息在分区中的位置,我们将offset称为“偏移量”;对于消费者消费到的位置,将offset 称为“位移”,有时候也会更明确地称之为“消费位移”。
在每次调用poll()方法时,它返回的是还没有被消费过的消息集(当然这个前提是消息已经存储在Kafka 中了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。
在新消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。不过需要非常明确的是,当前消费者需要提交的消费位移并不是x,而是 x+1。读者可能看过一些相关资料,里面所讲述的内容可能是提交的消费位移就是当前所消费到的消费位移,即提交的是x,这明显是错误的。类似的错误还体现在对LEO(Log End Offset)的解读上。
KafkaConsumer 类提供了 position(TopicPartition)和committed(TopicPartition)两个方法来分别获取上面所说的position和committed offset的值。
对于位移提交的具体时机的把握也很有讲究,有可能会造成重复消费和消息丢失的现象。当前一次poll()操作所拉取的消息集为[x+2,x+7],x+2代表上一次提交的消费位移,说明已经完成了x+1之前(包括x+1在内)的所有消息的消费,x+5表示当前正在处理的位置。如果拉取到消息之后就进行了位移提交,即提交了x+8,那么当前消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+8开始的。也就是说,x+5至x+7之间的消息并未能被消费,如此便发生了消息丢失的现象。再考虑另外一种情形,位移提交的动作是在消费完所有拉取到的消息之后才执行的,那么当消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+2开始的。也就是说,x+2至x+4之间的消息又重新消费了一遍,故而又发生了重复消费的现象。
在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit 配置,默认值为 true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是enable.auto.commit参数为true。
在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
在Kafka消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。
自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免,与此同时,自动位移提交也无法做到精确的位移管理。在Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。开启手动提交功能的前提是消费者客户端参数enable.auto.commit配置为false。
手动提交可以细分为同步提交和异步提交,对应于KafkaConsumer 中的 **commitSync()和commitAsync()**两种类型的方法。我们这里先讲述同步提交的方式:
commitSync()方法会根据poll()方法拉取的最新位移来进行提交,只要没有发生不可恢复的错误(Unrecoverable Error),它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,比如CommitFailedException、WakeupException、InterruptException、AuthenticationException、AuthorizationException等,我们可以将其捕获并做针对性的处理。
对于采用 commitSync()的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用commitSync()的另一个含参方法,该方法提供了一个 offsets 参数,用来提交指定分区的位移。无参的 commitSync()方法只能提交当前批次对应的 position 值。如果需要提交一个中间值,比如业务每消费一条消息就提交一次位移,那么就可以使用这种方式。(这个是最不会出错的方式了,同时也是消费性能最低的方式)。
与commitSync()方法相反,异步提交的方式(commitAsync())在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。commitAsync方法有三个不同的重载方法,具体定义如下:
第一个无参的方法和第三个方法中的offsets都很好理解,对照commitSync()方法即可。关键的是这里的第二个方法和第三个方法中的callback参数,它提供了一个异步提交的回调方法,当位移提交完成后会回调 OffsetCommitCallback 中的 onComplete()方法。
commitAsync()提交的时候同样会有失败的情况发生,那么我们应该怎么处理呢?读者有可能想到的是重试,问题的关键也就在这里了。如果某一次异步提交的消费位移为 x,但是提交失败了,然后下一次又异步提交了消费位移为 x+y,这次成功了。如果这里引入了重试机制,前一次的异步提交的消费位移在重试的时候提交成功了,那么此时的消费位移又变为了 x。如果此时发生异常(或者再均衡),那么恢复之后的消费者(或者新的消费者)就会从x处开始消费消息,这样就发生了重复消费的问题。(所以说重要业务重试最好设计一个重试队列,不一定还是消息队列,自己设计一个补偿机制即可)。
为此我们可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的值。在遇到位移提交失败需要重试的时候,可以检查所提交的位移和序号的值的大小,如果前者小于后者,则说明有更大的位移已经提交了,不需要再进行本次重试;如果两者相同,则说明可以进行重试提交。除非程序编码错误,否则不会出现前者大于后者的情况。
如果位移提交失败的情况经常发生,那么说明系统肯定出现了故障,在一般情况下,位移提交失败的情况很少发生,不重试也没有关系,后面的提交也会有成功的。重试会增加代码逻辑的复杂度,不重试会增加重复消费的概率。如果消费者异常退出,那么这个重复消费的问题就很难避免,因为这种情况下无法及时提交消费位移;如果消费者正常退出或发生再均衡的情况,那么可以在退出或再均衡执行之前使用同步提交的方式做最后的把关。
控制或关闭消费
KafkaConsumer 提供了对消费速度进行控制的方法,在有些应用场景下我们可能需要暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。KafkaConsumer中使用**pause()和resume()**方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作。KafkaConsumer还提供了一个无参的paused()方法来返回被暂停的分区集合。
指定位移消费
在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息。如果将auto.offset.reset参数配置为 “earliest”,那么消费者会从起始处,也就是0开始消费。auto.offset.reset参数还有一个可配置的值—“none”,配置为此值就意味着出现查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException异常。
如果能够找到消费位移,那么配置为“none”不会出现任何异常。如果配置的不是“latest”、“earliest”和“none”,则会报出ConfigException异常。
提供的auto.offset.reset 参数也只能在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而KafkaConsumer 中的 seek()方法正好提供了这个功能,让我们得以追前消费或回溯消费。
seek()方法中的参数partition表示分区,而offset参数用来指定从分区的哪个位置开始消费。seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll()方法的调用过程中实现的。也就是说,在执行seek()方法之前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置。
如果对未分配到的分区执行seek()方法,那么会报出IllegalStateException的异常。类似在调用subscribe()方法之后直接调用seek()方法。
如果消费组内的消费者在启动的时候能够找到消费位移,除非发生位移越界,否则auto.offset.reset参数并不会奏效,此时如果想指定从开头或末尾开始消费,就需要seek()方法的帮助了(或者干脆换一个消费组的名称也可以临时解决这个问题)。
beginningOffsets()方法中的参数内容和含义都与 endOffsets()方法中的一样,配合这两个方法我们就可以从分区的开头或末尾开始消费。其实KafkaConsumer中直接提供了seekToBeginning()方法和seekToEnd()方法来实现这两个功能。
有时候我们并不知道特定的消费位置,却知道一个相关的时间点,比如我们想要消费昨天8点之后的消息,这个需求更符合正常的思维逻辑。此时我们无法直接使用seek()方法来追溯到相应的位置。KafkaConsumer同样考虑到了这种情况,它提供了一个offsetsForTimes()方法,通过timestamp来查询与此对应的分区位置。
offsetsForTimes()方法的参数timestampsToSearch是一个Map类型,key为待查询的分区,而 value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于OffsetAndTimestamp中的offset和timestamp字段。
seek()方法为我们提供了从特定位置读取消息的能力,我们可以通过这个方法来向前跳过若干消息,也可以通过这个方法来向后回溯若干消息,这样为消息的消费提供了很大的灵活性。seek()方法也为我们提供了将消费位移保存在外部存储介质中的能力,还可以配合再均衡监听器来提供更加精准的消费能力。
再均衡
再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的再均衡的发生。
消费者拦截器
消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。与生产者拦截器对应的,消费者拦截器需要自定义实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口。
在消费者中也有拦截链的概念,和生产者的拦截链一样,也是按照interceptor.classes参数配置的拦截器的顺序来一一执行的(配置的时候,各个拦截器之间使用逗号隔开)。同样也要提防“副作用”的发生。如果在拦截链中某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。
多线程实现
KafkaProducer是线程安全的,然而KafkaConsumer却是非线程安全的。KafkaConsumer中定义了一个 acquire()方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出ConcurrentModifcationException异常,KafkaConsumer中的每个公用方法在执行所要执行的动作之前都会调用这个acquire()方法,只有wakeup()方法是个例外。
acquire()方法和我们通常所说的锁(synchronized、Lock等)不同,它不会造成阻塞等待,我们可以将其看作一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁和解锁操作。(cas的设计模式,判断线程数量直接)。
KafkaConsumer 非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行。如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。除此之外,由于Kafka 中消息保留机制的作用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。我们可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。多线程的实现方式有多种,第一种也是最常见的方式:线程封闭,即为每个线程实例化一个KafkaConsumer对象。一个线程对应一个KafkaConsumer实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数,根据之前介绍的消费者与分区数的关系,当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。(这也是spring的kafka客户端采用的策略)。
与此对应的第二种方式是多个消费线程同时消费同一个分区,这个通过 assign()、seek()等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费的能力。不过这种实现方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少,笔者也并不推荐。一般而言,分区是消费线程的最小划分单位。(所以说如果预计数据量很大的话,又要求消费的速度,提前设置好合理的较大的分区数量)。
重要的消费者参数
fetch.min.bytes
该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka中拉取的最小数据量,默认值为1(B)。Kafka在收到Consumer的拉取请求时,如果返回给Consumer的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取了。
fetch.max.bytes
它用来配置Consumer在一次拉取请求中从Kafka中拉取的最大数据量,默认值为 52428800(B),也就是 50MB。该参数设定的不是绝对的最大值,如果在第一个非空分区中拉取的第一条消息大于该值,那么该消息将仍然返回,以确保消费者继续工作。也就是说,上面问题的答案是可以正常消费。与此相关的,Kafka中所能接收的最大消息的大小通过服务端参数message.max.bytes(对应于主题端参数max.message.bytes)来设置。
fetch.max.wait.ms
这个参数也和fetch.min.bytes参数有关,如果Kafka仅仅参考fetch.min.bytes参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms参数用于指定Kafka的等待时间,默认值为500(ms)。如果Kafka中没有足够多的消息而满足不了fetch.min.bytes参数的要求,那么最终会等待500ms。这个参数的设定和Consumer与Kafka之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。
max.partition.fetch.bytes
这个参数用来配置从每个分区里返回给Consumer的最大数据量,默认值为1048576(B),即1MB。这个参数与 fetch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造成无法消费,Kafka 为了保持消费逻辑的正常运转不会对此做强硬的限制。
max.poll.records
这个参数用来配置Consumer在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。
connections.max.idle.ms
这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。
exclude.internal.topics
Kafka中有两个内部的主题:__consumer_offsets和__transaction_state。exclude.internal.topics用来指定Kafka中的内部主题是否可以向消费者公开,默认值为true。如果设置为true,那么只能使用subscribe(Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false则没有这个限制。
receive.buffer.bytes
这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为65536(B),即64KB。如果设置为-1,则使用操作系统的默认值。如果Consumer与Kafka处于不同的机房,则可以适当调大这个参数值。
send.buffer.bytes
这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。
request.timeout.ms
这个参数用来配置Consumer等待请求响应的最长时间,默认值为30000(ms)。
metadata.max.age.ms
这个参数用来配置元数据的过期时间,默认值为300000(ms),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker加入。
reconnect.backoff.ms
这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50(ms)。这种机制适用于消费者向broker发送的所有请求。
retry.backoff.ms
这个参数用来配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故障情况下频繁地重复发送,默认值为100(ms)。
isolation.level
这个参数用来配置消费者的事务隔离级别。字符串类型,有效值为“read_uncommitted”和“read_committed”,表示消费者所消费到的位置,如果设置为“read_committed”,那么消费者就会忽略事务未提交的消息,即只能消费到 LSO(LastStableOffset)的位置,默认情况下为“read_uncommitted”,即可以消费到HW(High Watermark)处的位置。
主题与分区
从Kafka的底层实现来说,主题和分区都是逻辑上的概念,分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等。
主题的管理
主题的管理包括创建主题、查看主题信息、修改主题和删除主题等操作。可以通过 Kafka提供的 kafka-topics.sh 脚本来执行这些操作,这个脚本位于$KAFKA_HOME/bin/目录下。主题的管理并非只有使用 kafka-topics.sh 脚本这一种方式,我们还可以通过KafkaAdminClient 的方式实现(这种方式实质上是通过发送 CreateTopicsRequest、DeleteTopicsRequest 等请求来实现的,甚至我们还可以通过直接操纵日志文件和ZooKeeper节点来实现。
创建主题
如果broker端配置参数auto.create.topics.enable设置为true(默认值就是true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions (默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数num.partitions和default.replication.factor的值来创建一个相应的主题。很多时候,这种自动创建主题的行为都是非预期的。除非有特殊应用需求,否则不建议将auto.create.topics.enable参数设置为true,这个参数会增加主题的管理与维护的难度。
更加推荐也更加通用的方式是通过kafka-topics.sh脚本来创建主题:
上面的示例中创建了一个分区数为 4、副本因子为 2 的主题。示例中的环境是一个包含 3个broker节点的集群。三个broker节点一共创建了8个文件夹,这个数字8实质上是分区数4与副本因子2的乘积。每个副本(或者更确切地说应该是日志,副本与日志一一对应)才真正对应了一个命名形式如<topic>-<partition>的文件夹。
主题、分区、副本和 Log(日志)的关系如图所示,主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切地说是Log层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的broker中,这样才能提供有效的数据冗余。对于示例中的分区数为4、副本因子为2、broker数为3的情况下,按照2、3、3的分区副本个数分配给各个broker是最优的选择。再比如在分区数为3、副本因子为3,并且broker数同样为3的情况下,分配3、3、3的分区副本个数给各个broker是最优的选择,也就是每个broker中都拥有所有分区的一个副本。
我们不仅可以通过日志文件的根目录来查看集群中各个broker的分区副本的分配情况,还可以通过ZooKeeper客户端来获取。当创建一个主题时会在ZooKeeper的/brokers/topics/目录下创建一个同名的实节点,该节点中记录了该主题的分区副本分配方案。
创建主题时对于主题名称的命名方式也很有讲究。首先是不能与已经存在的主题同名,如果创建了同名的主题就会报错。主题的命名同样不推荐(虽然可以这样做)使用双下画线“__”开头,因为以双下画线开头的主题一般看作Kafka的内部主题,比如__consumer_offsets和__transaction_state。主题的名称必须由大小写字母、数字、点号“.”、连接线“-”、下画线“_”组成,不能为空,不能只有点号“.”,也不能只有双点号“…”,且长度不能超过249。Kafka从0.10.x版本开始支持指定broker的机架信息(机架的名称)。如果指定了机架信息,则在分区副本分配时会尽可能地让分区副本分配到不同的机架上。指定机架信息是通过broker端参数broker.rack来配置的,比如配置当前broker所在的机架为“RACK1”:broker.rack=RACK1
。
分区副本的分配
在生产者和消费者中也都有分区分配的概念。生产者的分区分配是指为每条消息指定其所要发往的分区,消费者中的分区分配是指为消费者指定其可以消费消息的分区,而这里的分区分配是指为集群制定创建主题时的分区副本分配方案,即在哪个broker中创建哪些分区的副本。
之所以startIndex选择随机产生,是因为这样可以在多个主题的情况下尽可能地均匀分布分区副本,如果这里固定为一个特定值,那么每次的第一个副本都是在这个broker上,进而导致少数几个broker所分配到的分区副本过多而其余broker分配到的分区副本过少,最终导致负载不均衡。尤其是某些主题的副本数和分区数都比较少,甚至都为1的情况下,所有的副本都落到了那个指定的broker上。与此同时,在分配时位移量nextReplicaShift也可以更好地使分区副本分配得更加均匀。
分配副本时,除了处理第一个副本,其余的也调用 replicaIndex()方法来获得一个 broker,但这里和assignReplicasToBrokersRackUnaware()不同的是,这里不是简单地将这个broker添加到当前分区的副本列表之中,还要经过一层筛选,满足以下任意一个条件的broker不能被添加到当前分区的副本列表之中:
- 如果此broker所在的机架中已经存在一个broker拥有该分区的副本,并且还有其他的机架中没有任何一个broker拥有该分区的副本。
- 如果此broker中已经拥有该分区的副本,并且还有其他broker中没有该分区的副本。
查看主题
kafka-topics.sh脚本有5种指令类型:create、list、describe、alter和delete。其中list和describe指令可以用来方便地查看主题信息。
在使用 describe 指令查看主题信息时还可以额外指定 topics-with-overrides、under-replicated-partitions和unavailable-partitions这三个参数来增加一些附加功能。
增加topics-with-overrides参数可以找出所有包含覆盖配置的主题,它只会列出包含了与集群不一样配置的主题。注意使用topics-with-overrides参数时只显示原本只使用describe指令的第一行信息
under-replicated-partitions和unavailable-partitions参数都可以找出有问题的分区。通过 under-replicated-partitions 参数可以找出所有包含失效副本的分区。包含失效副本的分区可能正在进行同步操作,也有可能同步发生异常,此时分区的ISR集合小于 AR 集合。对于通过该参数查询到的分区要重点监控,因为这很可能意味着集群中的某个broker已经失效或同步效率降低等
修改主题
当一个主题被创建之后,依然允许我们对其做一定的修改,比如修改分区个数、修改配置等,这个修改的功能就是由kafka-topics.sh脚本中的alter指令提供的。
当主题中的消息包含key时(即key不为null),根据key计算分区的行为就会受到影响。当topic-config的分区数为1时,不管消息的key为何值,消息都会发往这一个分区;当分区数增加到3时,就会根据消息的key来计算分区号,原本发往分区0的消息现在有可能会发往分区1或分区2。如此还会影响既定消息的顺序,所以在增加分区数时一定要三思而后行。对于基于key计算的主题而言,建议在一开始就设置好分区数量,避免以后对其进行调整。
注意到在变更(增、删、改)配置的操作执行之后都会提示一段告警信息,指明了使用kafka-topics.sh脚本的alter指令来变更主题配置的功能已经过时(deprecated),将在未来的版本中删除,并且推荐使用kafka-configs.sh脚本来实现相关功能。
目前Kafka只支持增加分区数而不支持减少分区数。比如我们再将主题topic-config的分区数修改为1,就会报出InvalidPartitionException的异常。
为什么不支持减少分区?按照Kafka现有的代码逻辑,此功能完全可以实现,不过也会使代码的复杂度急剧增大。实现此功能需要考虑的因素很多,比如删除的分区中的消息该如何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入现有的分区,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题,以及分区和副本的状态机切换问题都是不得不面对的。反观这个功能的收益点却是很低的,如果真的需要实现此类功能,则完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。
配置管理
kafka-configs.sh 脚本是专门用来对配置进行操作的,这里的操作是指在运行状态下修改原有的配置,如此可以达到动态变更的目的。kafka-configs.sh脚本包含变更配置alter和查看配置describe这两种指令类型。同使用kafka-topics.sh脚本变更配置的原则一样,增、删、改的行为都可以看作变更操作,不过kafka-configs.sh脚本不仅可以支持操作主题相关的配置,还可以支持操作broker、用户和客户端这3个类型的配置。
kafka-configs.sh脚本使用entity-type参数来指定操作配置的类型,并且使用entity-name参数来指定操作配置的名称。比如查看主题topic-config的配置可以按如下方式执行:
–describe指定了查看配置的指令动作,–entity-type指定了查看配置的实体类型,–entity-name指定了查看配置的实体名称。entity-type只可以配置4个值:topics、brokers、clients和users。
使用alter指令变更配置时,需要配合add-config和delete-config这两个参数一起使用。add-config参数用来实现配置的增、改,即覆盖原有的配置;delete-config参数用来实现配置的删,即删除被覆盖的配置以恢复默认值。
使用kafka-configs.sh脚本来变更(alter)配置时,会在ZooKeeper中创建一个命名形式为/config/<entity-type>/<entity-name>的节点,并将变更的配置写入这个节点,比如对于主题topic-config而言,对应的节点名称为/config/topics/topic-config。
删除主题
如果确定不再使用一个主题,那么最好的方式是将其删除,这样可以释放一些资源,比如磁盘、文件句柄等。kafka-topics.sh脚本中的delete指令就可以用来删除主题。
可以看到在执行完删除命令之后会有相关的提示信息,这个提示信息和broker端配置参数delete.topic.enable 有关。必须将delete.topic.enable参数配置为true才能够删除主题,这个参数的默认值就是true,如果配置为false,那么删除主题的操作将会被忽略。在实际生产环境中,建议将这个参数的值设置为true。
使用kafka-topics.sh脚本删除主题的行为本质上只是在ZooKeeper中的/admin/delete_topics 路径下创建一个与待删除主题同名的节点,以此标记该主题为待删除的状态。与创建主题相同的是,真正删除主题的动作也是由Kafka的控制器负责完成的。
注意,删除主题是一个不可逆的操作。一旦删除之后,与其相关的所有消息数据会被全部删除,所以在执行这一操作的时候也要三思而后行。
初识KafkaAdminClient
一般情况下,我们都习惯使用kafka-topics.sh脚本来管理主题,但有些时候我们希望将主题管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用API的方式去实现。本节主要介绍KafkaAdminClient的基本使用方式,以及采用这种调用API方式下的创建主题时的合法性验证。
基本使用
KafkaAdminClient不仅可以用来管理broker、配置和ACL(Access Control List),还可以用来管理主题。KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient抽象类,并提供了多种方法:
- 创建主题:CreateTopicsResult createTopics(Collection<NewTopic>newTopics)
- 删除主题:DeleteTopicsResult deleteTopics(Collection<String>topics
- 列出所有可用的主题:ListTopicsResult listTopics()
- 查看主题的信息:DescribeTopicsResult describeTopics(Collection<String>topicNames)
- 查询配置信息:DescribeConfigsResult describeConfigs(Collection<ConfigResource>resources)
- 修改配置信息:AlterConfigsResult alterConfigs(Map<ConfigResource,Config>configs)
- 增加分区:CreatePartitionsResult createPartitions(Map<String,NewPartitions>newPartitions)
分区的管理
优先副本的选举
分区使用多副本机制来提升可靠性,但只有leader副本对外提供读写服务,而follower副本只负责在内部进行消息的同步。如果一个分区的leader副本不可用,那么就意味着整个分区变得不可用,此时就需要Kafka从剩余的follower副本中挑选一个新的leader副本来继续对外提供服务。虽然不够严谨,但从某种程度上说,broker 节点中 leader 副本个数的多少决定了这个节点负载的高低。
在创建主题的时候,该主题的分区及副本会尽可能均匀地分布到Kafka 集群的各个broker节点上,对应的leader副本的分配也比较均匀。
针对同一个分区而言,同一个broker节点中不可能出现它的多个副本,即Kafka集群的一个broker中最多只能有它的一个副本,我们可以将leader副本所在的broker节点叫作分区的leader节点,而follower副本所在的broker节点叫作分区的follower节点。
随着时间的更替,Kafka 集群的broker 节点不可避免地会遇到宕机或崩溃的问题,当分区的leader节点发生故障时,其中一个follower节点就会成为新的leader节点,这样就会导致集群的负载不均衡,从而影响整体的健壮性和稳定性。当原来的leader节点恢复之后重新加入集群时,它只能成为一个新的follower节点而不再对外提供服务。
为了能够有效地治理负载失衡的情况,Kafka引入了优先副本(preferred replica)的概念。所谓的优先副本是指在 AR 集合列表中的第一个副本。 比如上面主题 topic-partitions 中分区 0的AR集合列表(Replicas)为[1,2,0],那么分区0的优先副本即为1。理想情况下,优先副本就是该分区的leader副本,所以也可以称之为preferred leader。Kafka要确保所有主题的优先副本在Kafka集群中均匀分布,这样就保证了所有分区的leader均衡分布。如果leader分布过于集中,就会造成集群负载不均衡。
所谓的优先副本的选举是指通过一定的方式促使优先副本选举为leader副本,以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”。需要注意的是,分区平衡并不意味着Kafka集群的负载均衡,因为还要考虑集群中的分区分配是否均衡。更进一步,每个分区的leader副本的负载也是各不相同的,有些leader副本的负载很高,比如需要承载TPS为30000的负荷,而有些leader副本只需承载个位数的负荷。也就是说,就算集群中的分区分配均衡、leader 分配均衡,也并不能确保整个集群的负载就是均衡的,还需要其他一些硬性的指标来做进一步的衡量。
在 Kafka 中可以提供分区自动平衡的功能,与此对应的 broker 端参数是 auto.leader.rebalance.enable,此参数的默认值为true,即默认情况下此功能是开启的。如果开启分区自动平衡的功能,则Kafka 的控制器会启动一个定时任务,这个定时任务会轮询所有的broker节点,计算每个broker节点的分区不平衡率(broker中的不平衡率=非优先副本的leader个数/分区总数)是否超过leader.imbalance.per.broker.percentage参数配置的比值,默认值为 10%,如果超过设定的比值则会自动执行优先副本的选举动作以求分区平衡。执行周期由参数leader.imbalance.check.interval.seconds控制,默认值为300秒,即5分钟。
不过在生产环境中不建议将auto.leader.rebalance.enable设置为默认的true,因为这可能引起负面的性能问题,也有可能引起客户端一定时间的阻塞。因为执行的时间无法自主掌控,如果在关键时期(比如电商大促波峰期)执行关键任务的关卡上执行优先副本的自动选举操作,势必会有业务阻塞、频繁超时之类的风险。前面也分析过,分区及副本的均衡也不能完全确保集群整体的均衡,并且集群中一定程度上的不均衡也是可以忍受的,为防止出现关键时期“掉链子”的行为,笔者建议还是将掌控权把控在自己的手中,可以针对此类相关的埋点指标设置相应的告警,在合适的时机执行合适的操作,而这个“合适的操作”就是指手动执行分区平衡。
Kafka中kafka-perferred-replica-election.sh脚本提供了对分区leader副本进行重新平衡的功能。优先副本的选举过程是一个安全的过程,Kafka客户端可以自动感知分区leader副本的变更。
在实际生产环境中,一般使用 path-to-json-file 参数来分批、手动地执行优先副本的选举操作。尤其是在应对大规模的 Kafka 集群时,理应杜绝采用非 path-to-json-file参数的选举操作方式。同时,优先副本的选举操作也要注意避开业务高峰期,以免带来性能方面的负面影响。
分区重分配
当集群中的一个节点突然宕机下线时,如果节点上的分区是单副本的,那么这些分区就变得不可用了,在节点恢复前,相应的数据也就处于丢失状态;如果节点上的分区是多副本的,那么位于这个节点上的leader副本的角色会转交到集群的其他follower副本中。总而言之,这个节点上的分区副本都已经处于功能失效的状态,Kafka 并不会将这些失效的分区副本自动地迁移到集群中剩余的可用broker节点上,如果放任不管,则不仅会影响整个集群的均衡负载,还会影响整体服务的可用性和可靠性。
当集群中新增broker节点时,只有新创建的主题分区才有可能被分配到这个节点上,而之前的主题分区并不会自动分配到新加入的节点中,因为在它们被创建时还没有这个新节点,这样新节点的负载和原先节点的负载之间严重不均衡。
为了解决上述问题,需要让分区副本再次进行合理的分配,也就是所谓的分区重分配。Kafka提供了 kafka-reassign-partitions.sh 脚本来执行分区重分配的工作,它可以在集群扩容、broker节点失效的场景下对分区进行迁移。
kafka-reassign-partitions.sh 脚本的使用分为 3 个步骤:首先创建需要一个包含主题清单的JSON 文件,其次根据主题清单和broker 节点清单生成一份重分配方案,最后根据这份方案执行具体的重分配动作。除了让脚本自动生成候选方案,用户还可以自定义重分配方案,这样也就不需要执行第一步和第二步的操作了。
分区重分配的基本原理是先通过控制器为每个分区添加新副本(增加副本因子),新的副本将从分区的leader副本那里复制所有的数据。根据分区的大小不同,复制过程可能需要花一些时间,因为数据是通过网络复制到新副本上的。在复制完成之后,控制器将旧副本从副本清单里移除(恢复为原先的副本因子数)。注意在重分配的过程中要确保有足够的空间。
分区重分配对集群的性能有很大的影响,需要占用额外的资源,比如网络和磁盘。在实际操作中,我们将降低重分配的粒度,分成多个小批次来执行,以此来将负面的影响降到最低,这一点和优先副本的选举有异曲同工之妙。
还需要注意的是,如果要将某个broker下线,那么在执行分区重分配动作之前最好先关闭或重启broker。这样这个broker就不再是任何分区的leader节点了,它的分区就可以被分配给集群中的其他broker。这样可以减少broker间的流量复制,以此提升重分配的性能,以及减少对集群的影响。
复制限流
数据复制会占用额外的资源,如果重分配的量太大必然会严重影响整体的性能,尤其是处于业务高峰期的时候。减小重分配的粒度,以小批次的方式来操作是一种可行的解决思路。如果集群中某个主题或某个分区的流量在某段时间内特别大,那么只靠减小粒度是不足以应对的,这时就需要有一个限流的机制,可以对副本间的复制流量加以限制来保证重分配期间整体服务不会受太大的影响。
副本间的复制限流有两种实现方式:kafka-config.sh脚本和kafka-reassign-partitions.sh脚本。
kafka-config.sh脚本主要以动态配置的方式来达到限流的目的,在broker级别有两个与复制限流相关的配置参数:follower.replication.throttled.rate和leader.replication.throttled.rate,前者用于设置follower副本复制的速度,后者用于设置leader副本传输的速度,它们的单位都是B/s。
在主题级别也有两个相关的参数来限制复制的速度:leader.replication.throttled.replicas 和follower.replication.throttled.replicas,它们分别用来配置被限制速度的主题所对应的leader副本列表和follower副本列表。
修改副本因子
我们可以将其他分区的 replicas 内容也改成[0,1,2],这样每个分区的副本因子就都从 2增加到了3。注意增加副本因子时也要在log_dirs中添加一个“any”,这个log_dirs代表Kafka中的日志目录,对应于broker端的log.dir或log.dirs参数的配置值,如果不需要关注此方面的细节,那么可以简单地设置为“any”。
如何选择合适的分区数
分区数越多也会让Kafka的正常启动和关闭的耗时变得越长,与此同时,主题的分区数越多不仅会增加日志清理的耗时,而且在被删除时也会耗费更多的时间。对旧版的生产者和消费者客户端而言,分区数越多,也会增加它们的开销,不过这一点在新版的生产者和消费者客户端中有效地得到了抑制。
如果一定要给一个准则,则建议将分区数设定为集群中broker的倍数,即假定集群中有3个broker节点,可以设定分区数为3、6、9等,至于倍数的选定可以参考预估的吞吐量。(如果集群中的broker 节点数有很多,比如大几十或上百、上千,那么这种准则也不太适用,在选定分区数时进一步可以引入基架等参考因素。)
分区数量也不是越多越好,要有基于自己实际业务和性能基础测试的一个灵活判断
日志存储
文件目录布局
Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区,分区的数量可以在主题创建的时候指定,也可以在之后修改。每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的序列号,也就是通常所说的偏移量(offset)。
如果分区规则设置得合理,那么所有的消息可以均匀地分布到不同的分区中,这样就可以实现水平扩展。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)。
向Log 中追加消息时是顺序写入的,只有最后一个 LogSegment 才能执行写入操作,在此之前所有的 LogSegment 都不能写入数据。为了方便描述,我们将最后一个 LogSegment 称为“activeSegment”,即表示当前活跃的日志分段。随着消息的不断写入,当activeSegment满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment。
为了便于消息的检索,每个LogSegment中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex”为文件后缀)。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment中第一条消息的offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为20位数字,没有达到的位数则用0填充。比如第一个LogSegment的基准偏移量为0,对应的日志文件为00000000000000000000.log。
注意每个LogSegment中不只包含“.log”“.index”“.timeindex”这3种文件,还可能包含“.deleted”“.cleaned”“.swap”等临时文件,以及可能的“.snapshot”“.txnindex”“leader-epoch-checkpoint”等文件。
在某一时刻,Kafka 中的文件目录布局如图 5-2 所示。每一个根目录都会包含最基本的 4个检查点文件(xxx-checkpoint)和meta.properties 文件。在创建主题的时候,如果当前 broker中不止配置了一个根目录,那么会挑选分区数最少的那个根目录来完成本次创建任务。
日志格式的演变
对一个成熟的消息中间件而言,消息格式(或者称为“日志格式”)不仅关系功能维度的扩展,还牵涉性能维度的优化。随着Kafka 的迅猛发展,其消息格式也在不断升级改进,从0.8.x版本开始到现在的2.0.0版本,Kafka的消息格式也经历了3个版本:v0版本、v1版本和v2版本。
每个分区由内部的每一条消息组成,如果消息格式设计得不够精炼,那么其功能和性能都会大打折扣。比如有冗余字段,势必会不必要地增加分区的占用空间,进而不仅使存储的开销变大、网络传输的开销变大,也会使Kafka的性能下降。反观如果缺少字段,比如在最初的Kafka消息版本中没有timestamp字段,对内部而言,其影响了日志保存、切分策略,对外部而言,其影响了消息审计、端到端延迟、大数据应用等功能的扩展。虽然可以在消息体内部添加一个时间戳,但解析变长的消息体会带来额外的开销,而存储在消息体(参考图5-3中的value字段)前面可以通过指针偏移量获取其值而容易解析,进而减少了开销(可以查看v1版本),虽然相比于没有 timestamp 字段的开销会大一点。由此可见,仅在一个字段的一增一减之间就有这么多门道,那么Kafka具体是怎么做的呢?本节只针对Kafka 0.8.x之上(包含)的版本做相应说明,对于之前的版本不做陈述。
消息压缩
常见的压缩算法是数据量越大压缩效果越好,一条消息通常不会太大,这就导致压缩效果并不是太好。而Kafka实现的压缩方式是将多条消息一起进行压缩,这样可以保证较好的压缩效果。在一般情况下,生产者发送的压缩数据在broker中也是保持压缩状态进行存储的,消费者从服务端获取的也是压缩的消息,消费者在处理消息之前才会解压消息,这样保持了端到端的压缩。
Kafka 日志中使用哪种压缩方式是通过参数 compression.type 来配置的,默认值为“producer”,表示保留生产者使用的压缩方式。这个参数还可以配置为“gzip”“snappy”“lz4”,分别对应 GZIP、SNAPPY、LZ4 这 3 种压缩算法。如果参数compression.type 配置为“uncompressed”,则表示不压缩。
压缩消息,英文是compress message,Kafka中还有一个compact message,常常被人们直译成压缩消息,需要注意两者的区别。compact message是针对日志清理策略而言的(cleanup.policy=compact),是指日志压缩(Log Compaction)后的消息
变长字段
Kafka从0.11.0版本开始所使用的消息格式版本为v2,这个版本的消息相比v0和v1的版本而言改动很大,同时还参考了Protocol Buffer[1]而引入了变长整型(Varints)和ZigZag编码。为了更加形象地说明问题,首先我们来了解一下变长整型。
Varints是使用一个或多个字节来序列化整数的一种方法。数值越小,其占用的字节数就越少。Varints中的每个字节都有一个位于最高位的msb位(most significant bit),除最后一个字节外,其余msb位都设置为1,最后一个字节的msb位为0。这个msb位表示其后的字节是否和当前字节一起来表示同一个整数。除msb位外,剩余的7位用于存储数据本身,这种表示类型又称为Base 128。通常而言,一个字节8位可以表示256个值,所以称为Base 256,而这里只能用7位表示,2的7次方即128。Varints中采用的是小端字节序,即最小的字节放在最前面。
ZigZag编码以一种锯齿形(zig-zags)的方式来回穿梭正负整数,将带符号整数映射为无符号整数,这样可以使绝对值较小的负数仍然享有较小的Varints编码值,比如-1编码为1,1编码为2,-2编码为3。
前面说过Varints中的一个字节中只有7位是有效数值位,即只能表示128个数值,转变成绝对值之后其实质上只能表示64个数值。比如对消息体长度而言,其值肯定是大于等于0的正整数,那么一个字节长度的Varints最大只能表示64。
v2版本
v2版本中消息集称为Record Batch,而不是先前的Message Set,其内部也包含了一条或多条消息,消息的格式参见图的中部和右部。在消息压缩的情形下,Record Batch Header部分(参见图5-7左部,从first offset到records count字段)是不被压缩的,而被压缩的是records字段中的所有内容。生产者客户端中的ProducerBatch对应这里的RecordBatch,而ProducerRecord对应这里的Record。
先讲述消息格式Record的关键字段,可以看到内部字段大量采用了Varints,这样Kafka可以根据具体的值来确定需要几个字节来保存。v2版本的消息格式去掉了crc字段,另外增加了length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增量)和headers信息,并且attributes字段被弃用了,笔者对此做如下分析(key、key length、value、value length字段同v0和v1版本的一样,这里不再赘述):
- length:消息总长度。
- attributes:弃用,但还是在消息格式中占据1B的大小,以备未来的格式扩展。
- timestamp delta:时间戳增量。通常一个timestamp需要占用8个字节,如果像这里一样保存与RecordBatch的起始时间戳的差值,则可以进一步节省占用的字节数。
- offset delta:位移增量。保存与 RecordBatch起始位移的差值,可以节省占用的字节数。
- headers:这个字段用来支持应用级别的扩展,而不需要像v0和v1版本一样不得不将一些应用级别的属性值嵌入消息体。Header的格式如图5-7最右部分所示,包含key和value,一个Record里面可以包含0至多个Header。
对于 v1 版本的消息,如果用户指定的 timestamp 类型是LogAppendTime 而不是CreateTime,那么消息从生产者进入broker 后,timestamp 字段会被更新,此时消息的 crc值将被重新计算,而此值在生产者中已经被计算过一次。再者,broker 端在进行消息格式转换时(比如v1版转成v0版的消息格式)也会重新计算crc的值。在这些类似的情况下,消息从生产者到消费者之间流动时,crc的值是变动的,需要计算两次crc的值,所以这个字段的设计在 v0 和 v1 版本中显得比较“鸡肋”。在 v2 版本中将 crc 的字段从 Record 中转移到了RecordBatch中。
v2版本对消息集(RecordBatch)做了彻底的修改,除了刚刚提及的crc字段,还多了如下字段:
- first offset:表示当前RecordBatch的起始位移。
- length:计算从partition leader epoch字段开始到末尾的长度。
- partition leader epoch:分区leader纪元,可以看作分区leader的版本号或更新次数
- magic:消息格式的版本号,对v2版本而言,magic等于2。
- attributes:消息属性,注意这里占用了两个字节。低3位表示压缩格式,可以参考v0和v1;第4位表示时间戳类型;第5位表示此RecordBatch是否处于事务中,0表示非事务,1表示事务。第6位表示是否是控制消息(ControlBatch),0表示非控制消息,而1表示是控制消息,控制消息用来支持事务功能。
- last offset delta:RecordBatch中最后一个Record的offset与first offset的差值。主要被broker用来确保RecordBatch中Record组装的正确性。
- first timestamp:RecordBatch中第一条Record的时间戳。
- max timestamp:RecordBatch 中最大的时间戳,一般情况下是指最后一个 Record的时间戳,和last offset delta的作用一样,用来确保消息组装的正确性。
- producer id:PID,用来支持幂等和事务。
- producer epoch:和producer id一样,用来支持幂等和事务。
- first sequence:和 producer id、producer epoch 一样,用来支持幂等和事务。
- records count:RecordBatch中Record的个数。
这么看上去v2版本的消息好像要比之前版本的消息所占用的空间大得多,的确对单条消息而言是这样的,如果我们连续向主题msg_format_v2中再发送10条value长度为6、key为null的消息。本来应该占用740B大小的空间,实际上只占用了191B,在v0版本中这10条消息需要占用320B的空间大小,而v1版本则需要占用400B的空间大小,这样看来v2版本又节省了很多空间,因为它将多个消息(Record)打包存放到单个RecordBatch中,又通过Varints编码极大地节省了空间。v2版本的消息不仅提供了更多的功能,比如事务、幂等性等,某些情况下还减少了消息的空间占用,总体性能提升很大。