在消息队列系统中,准确记录和管理消息的消费进度是保障系统可靠性和数据一致性的关键。RocketMQ 作为一款高性能、高可用的分布式消息队列,其 ConsumerOffsetManager
组件在消费进度管理方面发挥着至关重要的作用。本文将详细介绍 ConsumerOffsetManager
的主要功能、重要属性和核心方法。
1. 概述
ConsumerOffsetManager
是 RocketMQ Broker 端的一个重要组件,主要负责管理消费者的消费偏移量(offset)。消费偏移量是指消费者在消息队列中已经消费到的位置,通过管理这个偏移量,ConsumerOffsetManager
可以确保消费者在重启或故障恢复后能够从正确的位置继续消费消息,避免消息的重复消费或遗漏。
2. 主要属性
这是一个核心的属性,其类型为 ConcurrentMap<String, ConcurrentMap<Integer, Long>>
。它是一个双层的并发映射结构,外层的键是 topic@consumerGroup
的组合字符串,用于唯一标识一个主题和消费组的组合;内层的键是队列 ID(Integer
类型),值是该队列对应的消费偏移量(Long
类型)。这个属性存储了所有消费者在各个队列上的消费进度信息
3. 主要方法
ConsumerOffsetManager继承于ConfigManager,里面会应用到ConfigManager中的一一些方法
//随着consumer不断的从broker这里消费topic中的queue数据,此时需要进行记录consumer对topic中的每个queue的消费到了哪个位置 offset
//对消费偏移量的管理就是这个ConsumerOffsetManager组件
public class ConsumerOffsetManager extends ConfigManager {
}
3.1 load()
该方法用于从磁盘文件中加载偏移量信息到内存中的 offsetTable
中。在 Broker 启动时,会调用这个方法,将之前持久化的偏移量信息恢复到内存,以便继续管理消费者的消费进度。如果加载成功,方法返回 true
;否则返回 false
。
public boolean load() {String fileName = null;try {//获取到配置文件的地址 是从子类中进行获取配置文件的地址fileName = this.configFilePath();//读取文件内容位一个大的json字符串String jsonString = MixAll.file2String(fileName);if (null == jsonString || jsonString.length() == 0) {return this.loadBak();} else {//解码this.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}}
3.2persist()
persist
方法用于将内存中的偏移量信息持久化到磁盘文件中。ConsumerOffsetManager
会定期调用这个方法,将 offsetTable
中的数据写入到 configFilePath
指向的文件中,以确保数据的持久化和可靠性。
/*** 子类文件中的核心数据进行持久化操作* 每次进行持久化的时候,老文件的内容进行写入到.bak文件中做备份* 新文件的内容写入到.tmp文件中,老文件删除,tmp文件改名为新文件*/public synchronized void persist() {String jsonString = this.encode(true);if (jsonString != null) {String fileName = this.configFilePath();try {//把json文件写入到磁盘中MixAll.string2File(jsonString, fileName);} catch (IOException e) {log.error("persist file " + fileName + " exception", e);}}}
3.3 commitOffset()
此方法用于提交消费者的消费偏移量。当消费者成功消费一批消息后,会向 Broker 发送偏移量提交请求,Broker 会调用 commitOffset
方法将新的偏移量更新到 offsetTable
中。参数 group
是消费组名称,topic
是主题名称,queueId
是队列 ID,offset
是新的消费偏移量
/*** 提交offset(消费的偏移量)* @param clientHost 客户端地址 机器ip* @param group 消费者组* @param topic topic的名字* @param queueId topic下的队列id* @param offset 消费的偏移量*/public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,final long offset) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;this.commitOffset(clientHost, key, queueId, offset);}/*** 提交offset(消费的偏移量)* @param clientHost 客户端地址 机器ip* @param key offsetTable这个Map中key* @param queueId topic下的队列id* @param offset 消费的偏移量*/private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null == map) {//如果是空 证明是第一次来进行提交offset 第一次的话就初始化一个Map 把消费的offset给放入到初始化的map中map = new ConcurrentHashMap<Integer, Long>(32);map.put(queueId, offset);this.offsetTable.put(key, map);} else {//不为空 更新一下消费者 对传入的queueId消费的偏移量Long storeOffset = map.put(queueId, offset);if (storeOffset != null && offset < storeOffset) {log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);}}}
3.4cloneOffset()
cloneOffset
方法的主要功能是将一个消费组在某个主题下的偏移量数据克隆到另一个消费组。在某些场景下,例如需要对消费进度进行备份或者将某个消费组的消费进度复制给新的消费组时,这个方法就非常有用。
/*** 克隆offset* @param srcGroup 源的消费者组* @param destGroup 目标的消费者组* @param topic topic的名字*/public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);if (offsets != null) {this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));}}
3.5removeOffset()
removeOffset
方法用于移除某个消费组在指定主题下的偏移量信息。当某个消费组不再需要消费某个主题的消息,或者需要清理过期的偏移量数据时,可以使用该方法。
public void removeOffset(final String group) {Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConcurrentMap<Integer, Long>> next = it.next();String topicAtGroup = next.getKey();if (topicAtGroup.contains(group)) {String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length == 2 && group.equals(arrays[1])) {it.remove();log.warn("clean group offset {}", topicAtGroup);}}}}
3.6 queryOffset()
此方法用于查询特定消费组在指定主题和队列上的消费偏移量。在 RocketMQ 集群消费模式下,它能让消费者清楚当前在某个队列消费到了什么位置,从而保证消息消费的连续性与准确性
public Map<Integer, Long> queryOffset(final String group, final String topic) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;return this.offsetTable.get(key);}
3.7queryMinOffsetInAllGroup()
这个方法用于查询某个主题下所有消费组在指定队列上的最小消费偏移量。在某些场景下,比如清理消息时,需要知道所有消费组的最小消费位置,以确保不会删除还未被消费的消息。
/*** 在所有的消费者组中查询最小的offset* 意思是 topic中每个queue被消费的最小的offset是什么* @param topic topic的名字* @param filterGroups 要排除的一写消费者组的信息* @return*/public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();// 先从offsetTable中过滤掉一些不需要进行查询的数据Set<String> topicGroups = this.offsetTable.keySet();if (!UtilAll.isBlank(filterGroups)) {for (String group : filterGroups.split(",")) {Iterator<String> it = topicGroups.iterator();while (it.hasNext()) {if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {it.remove();}}}}// 遍历offsetTablefor (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {String topicGroup = offSetEntry.getKey();String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);// 判断topic跟指定的查询的topic是否相等if (topic.equals(topicGroupArr[0])) {for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {//查询当前topic的队列的最小offsetlong minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());// 如果消费者组里的offset大于等于最小的offsetif (entry.getValue() >= minOffset) {Long offset = queueMinOffset.get(entry.getKey());if (offset == null) {queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue()));} else {queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset));}}}}}return queueMinOffset;}
3.8scanUnsubscribedTopic()
在 RocketMQ 的实际运用中,消费者可能会动态地订阅或取消订阅某些主题。当一个主题不再被任何消费者订阅时,其对应的消费偏移量数据就不再有存在的必要。scanUnsubscribedTopic
方法会周期性地检查并清理这些不再使用的数据,避免数据冗余。
/*** 去扫描没有被消费者订阅的topic*/public void scanUnsubscribedTopic() {Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConcurrentMap<Integer, Long>> next = it.next();String topicAtGroup = next.getKey();String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length == 2) {// topic@group 第一个元素为topic 第二个元素为group(消费组)String topic = arrays[0];String group = arrays[1];// ConsumerManager consumer管理的组件 去看看当前消费组里是不是还有这个消费者连接过来对这个topic进行拉取和订阅if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)//这个消费组之前还有消费者拉取数据的时候,消费的offset已经落后最新的offset已经很多了&& this.offsetBehindMuchThanData(topic, next.getValue())) {// 上面两个条件都满足的场景 这个消费组以前有消费者进行消费数据,但是现在没有消费者了,并且以前消费的offset已经落后了很多// 把这个topic@group的消费偏移量信息从内存中移除it.remove();log.warn("remove topic offset, {}", topicAtGroup);}}}}/*** 判断在当前的topic的消费偏移量信息是否已经落后了很多* @param topic topic的名字* @param table map结果,map的key为queueId,map的value为消费的offset* @return*/private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap<Integer, Long> table) {Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();boolean result = !table.isEmpty();while (it.hasNext() && result) {Entry<Integer, Long> next = it.next();//获取当前queueId在store(消息存储组件里)中的最小offsetlong minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, next.getKey());//某一个消费组对一个queue 消费的offset(偏移量)long offsetInPersist = next.getValue();result = offsetInPersist <= minOffsetInStore;}return result;}
4. 总结
ConsumerOffsetManager
是 RocketMQ 中一个非常重要的组件,它通过管理消费者的消费偏移量,确保了消息的准确消费和系统的可靠性。了解 ConsumerOffsetManager
的主要属性和方法,有助于开发者更好地理解 RocketMQ 的工作原理,优化消息消费的性能,以及处理各种异常情况。在实际应用中,合理配置 flushDelayOffsetInterval
等参数,可以在性能和数据可靠性之间找到最佳平衡点。