您的位置:首页 > 房产 > 建筑 > RocketMQKafka重试队列

RocketMQKafka重试队列

2024/12/23 6:08:13 来源:https://blog.csdn.net/grant167/article/details/141305415  浏览:    关键词:RocketMQKafka重试队列

为实现服务间的解耦和部分逻辑的异步处理,我们的系统采纳了消息驱动的方法。通过消息队列的使用,各个服务能够基于事件进行通信,从而降低了直接的依赖关系,优化了系统的响应性能和可靠性。

为什么需要考虑消费重试?

在业务开发过程中,异常处理和重试机制是确保系统稳定性和数据一致性的关键环节。例如,Spring Retry 提供了灵活的重试策略,以增强方法调用的健壮性。同样,在使用 OpenFeign 进行远程服务调用时,其集成的 Ribbon 组件也能够处理接口调用过程中的重试逻辑。

对于消息队列的消费场景,消息消费的重试同样至关重要。当消息处理过程中遇到异常时,合理的重试策略可以确保业务逻辑的正确执行和数据的完整性。

rocketmq与kafka如何进行重试的?

rocketmq(4.9.x版本)

重试队列

若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列

消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

  • 最大重试次数:消息消费失败后,可被重复投递的最大次数。
consumer.setMaxReconsumeTimes(10);
  • 重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。
consumer.setSuspendCurrentQueueTimeMillis(5000);

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下

消费类型重试间隔最大重试次数
顺序消费间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX
并发消费间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值
死信队列​

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。

消费者示例

springboot 集成 maven

@Component
@RocketMQMessageListener(topic = "topicTest",consumerGroup = "test", selectorExpression =  "tagTest",consumeThreadNumber = 10)
@RequiredArgsConstructor
public static class TestListener implements RocketMQListener<String> {public void onMessage(String event) {log.info("consume event: {}", event);// 处理业务}
}
rocketmq dashboard

重试队列相关topic见下图
在这里插入图片描述
可以看到有个%RETRY%test的topic,即重试topic为%RETRY%+consumerGroup。

源码解析

自定义实现MessageListener消费消息接口,并发消费消息,如下为spring-rocketmq的实现。业务开发只需要实现RocketMQListener即可,见消费者示例。

org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
private void handleMessage(MessageExt messageExt)  {if (rocketMQListener != null) {rocketMQListener.onMessage(doConvertMessage(messageExt));}
}

根据业务接口返回的状态来决定下一步如何处理

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
// 根据状态,设置ackIndex
switch (status) {case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER:ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;
}switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// 广播模式仅输出日志log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING:// 集群模式下,只要有一条消息消费失败,则遍历本次消费的所有消息,发回broker// consumeRequest.getMsgs().size() 由ConsumeMessageBatchMaxSize决定,默认为1List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());// ackidx = -1, i从0开始,遍历所有消息for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;
}
sendMsgBack 发回broker
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {int delayLevel = context.getDelayLevelWhenNextConsume();// Wrap topic with namespace before sending back message.msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg, e);}return false;
}

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack

 public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName){try {String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());this.mQClientFactory.getDefaultMQProducer().send(newMsg);} finally {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}
}

org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack

ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
requestHeader.setBname(brokerName);RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);
assert response != null;
switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;
}

4.9.x 最终还是通过netty发送到broker的
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync

5.x 通过grpc通信,取代了remoting的通信方式;通过proxy访问,而不是直接链接到broker;

broker端schedule处理
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request) t {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ConsumerSendMsgBackRequestHeader requestHeader =(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);……// 读写权限校验等;if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");return CompletableFuture.completedFuture(response);}if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return CompletableFuture.completedFuture(response);}// 重试topicString newTopic = MixAll.getRetryTopic(requestHeader.getGroup());int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();int topicSysFlag = 0;if (requestHeader.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 创建重试topic,并设置读写权限TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return CompletableFuture.completedFuture(response);}if (!PermName.isWriteable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));return CompletableFuture.completedFuture(response);}MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());if (null == msgExt) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("look message by offset failed, " + requestHeader.getOffset());return CompletableFuture.completedFuture(response);}final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (null == retryTopic) {MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());}msgExt.setWaitStoreMsgOK(false);int delayLevel = requestHeader.getDelayLevel();int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {Integer times = requestHeader.getMaxReconsumeTimes();if (times != null) {maxReconsumeTimes = times;}}// 根据重试次数来判断是发送到死信队列或重试队列if (msgExt.getReconsumeTimes() >= maxReconsumeTimes|| delayLevel < 0) {newTopic = MixAll.getDLQTopic(requestHeader.getGroup());queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE | PermName.PERM_READ, 0);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return CompletableFuture.completedFuture(response);}msgExt.setDelayTimeLevel(0);} else {if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);String originMsgId = MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));// 消息持久化CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);return putMessageResult.thenApply(r -> {if (r != null) {switch (r.getPutMessageStatus()) {case PUT_OK:String backTopic = msgExt.getTopic();String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (correctTopic != null) {backTopic = correctTopic;}if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msgInner.getTopic())) {this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), r.getAppendMessageResult().getWroteBytes());this.brokerController.getBrokerStatsManager().incQueuePutNums(msgInner.getTopic(), msgInner.getQueueId());this.brokerController.getBrokerStatsManager().incQueuePutSize(msgInner.getTopic(), msgInner.getQueueId(), r.getAppendMessageResult().getWroteBytes());}this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;default:break;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(r.getPutMessageStatus().name());return response;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("putMessageResult is null");return response;});
}org.apache.rocketmq.store.MessageStore#asyncPutMessage
org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
org.apache.rocketmq.store.CommitLog#asyncPutMessagepublic CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();
//        int queueId msg.getQueueId();final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 持久化到SCHEDULE_TOPIC_XXXX 中,queueId 根据delayLevel转换// org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// 将真实的topic、queueId 保存到property中// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();updateMaxMessageSize(putMessageThreadLocal);if (!multiDispatch.isMultiDispatchMsg(msg)) {PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());}PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 将消息持久化result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} finally {beginTimeInLock = 0;putMessageLock.unlock();}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}……
}

消费commitLog中的消息

org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeuppublic void executeOnTimeup() {// comsumeQueue 不存在则创建ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));if (cq == null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ == null) {long resetOffset;if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());} else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());} else {resetOffset = this.offset;}this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);return;}long nextOffset = this.offset;try {int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown = deliverTimestamp - now;if (countdown > 0) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}// 根据offset查找消息MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt == null) {continue;}MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}boolean deliverSuc;// 投递消息到重试队列if (ScheduleMessageService.this.enableAsyncDeliver) {deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);} else {deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);}if (!deliverSuc) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}}nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);} catch (Exception e) {log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);} finally {bufferCQ.release();}this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,int sizePy) {PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);PutMessageResult result = resultProcess.get();boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;if (sendStatus) {ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());}return sendStatus;
}// messageStore 异步持久化消息-->commitLog-->mappedFile
private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,long offsetPy, int sizePy, boolean autoResend) {CompletableFuture<PutMessageResult> future =ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);return new PutResultProcess().setTopic(msgInner.getTopic()).setDelayLevel(this.delayLevel).setOffset(offset).setPhysicOffset(offsetPy).setPhysicSize(sizePy).setMsgId(msgId).setAutoResend(autoResend).setFuture(future).thenProcess();
}
订阅重试队列

消息经过指定延迟时间被投递到延迟队列后,就可以被消费者消费了。

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscriptionswitch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:break;// 集群模式才会去订阅重试队列case CLUSTERING:final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;
}
消息的拉取
topic的重新设置
public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {final String groupTopic = MixAll.getRetryTopic(consumerGroup);for (MessageExt msg : msgs) {String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (retryTopic != null && groupTopic.equals(msg.getTopic())) {msg.setTopic(retryTopic);}if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}}
}

重新处理业务逻辑
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage

总结

重试的主要流程:
1、consumer消费失败,将消息发送回broker;
2、broker收到重试消息之后,先存储到定时队列里;
3、根据重试次数,经过一定延迟时间后,重新投递到retryTopic;
4、consumer会拉取consumerGroup对应的retryTopic的消息;
5、consumer拉取到retryTopic消息之后,转换为原始的topic,由messageListener实现类消费。

QA

1、为什么不是发送到原队列?
因为可能有多个消费者组消费了同一条消息,而只有某一组报错了。发到原队列,则可能会导致其它消费者组重复订阅。

2、重试队列数量,即consumerGroup数量在哪些方面可能产生影响?
a)、broker向nameserver注册时,会注册topic信息,而consumerGroup会增加重试topic;
b)、每个并发消费的consumer都会启动指定核心线程数量的线程池;
c)、订阅关系一致;
d)、客户端重平衡;

kafka重试

kafka原生支持程度

kafka本身并不支持重试、延迟、死信队列等特性。
spring-kafka 封装了重试队列,每个topic都会产生对应的重试topic。而kafka topic数量如果过多,则会影响到kafka的吞吐量。

自定义实现重试队列逻辑

1、实现KafkaListenerErrorHandler,重写handleError方法,在消费失败后为消息header增加retryCount,记录失败的组id,并根据retryCount来判断发送到retry topic 或 dead topic;

@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception,Consumer<?, ?> consumer) {log.warn("消费消息失败,将进行重试或进入死信", exception);//重试次数ConsumerRecord record = (ConsumerRecord) message.getPayload();//读取自定义的请求头;如果有,读取重试过的次数;Header retryHeader = record.headers().lastHeader("__retry_dead_head__");Integer retryTimes = 0;if (retryHeader != null && retryHeader.value() != null) {retryTimes = JsonUtils.toData(stringDeserializer.deserialize(null, retryHeader.value()), Map.class).get(RETRY_KEY);;}Headers headers = record.headers();String groupId = consumer.groupMetadata().groupId();Map<String, Object> custData = new HashMap<>();// 处理自定义请求头,比如原topic、groupId等等;assembleCustData(custData );if (hasRetry >= maxRetryCount) {message2Dead(record, custData, headers);} else {// 未超过重试次数,仍发送到重试队列里message2Retry(record, hasRetry, custData, headers);}//提交消费点consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));return null;
}private void message2Retry(ConsumerRecord record, Integer retryTimes, Map<String, Object> custData, Headers headers) {custData.put(RETRY_KEY, hasRetry + 1);try {// 定义一个统一的retry topickafkaTemplate.send(new ProducerRecord<>(RETRY_TOPIC, null, null, record.key(), record.value(), buildNewHeaders(headers, custData))).get();} catch (Exception ex) {throw new BusinessException("发送到死信队列异常", ex);}
}

1.1、在消费者注入retryHandler

@Bean
public KafkaListenerErrorHandler kafkaListenerErrorHandler() {Map<String, Object> produceConfig = kafkaProperties.buildProducerProperties();produceConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());ProducerFactory producerFactory = new DefaultKafkaProducerFactory(produceConfig);KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory);RetryListenerErrorHandler kafkaListenerErrorHandler = new RetryListenerErrorHandler(kafkaTemplate, appName);return kafkaListenerErrorHandler;
}

2、监听 RETRY_TOPIC

@KafkaListener(topics = {RETRY_TOPIC})
public void retryDeal(ConsumerRecord<byte[], byte[]> record, Acknowledgment ack) throws ExecutionException, InterruptedException, UnsupportedEncodingException {Header customerHead = record.headers().lastHeader(RetryListenerErrorHandler.CUSTOMER_HEAD_NAME);Long timestamp = record.timestamp();if (System.currentTimeMillis() < timestamp + RETRY_DELAY_DURATION.toMillis()) {try {Thread.sleep((timestamp + RETRY_DELAY_DURATION.toMillis()) - System.currentTimeMillis());} catch (InterruptedException e) {e.printStackTrace();}}Map<String, Object> custData = JsonUtils.toData(new StringDeserializer().deserialize(null, customerHead.value()), Map.class);String topic = (String) custData.get(RetryListenerErrorHandler.CONSUMER_TOPIC);Integer partition = (Integer) custData.get(RetryListenerErrorHandler.PARTITION_KEY);// 在达到延迟时间后,发送到原队列原partitionkafkaTemplate.send(new ProducerRecord<>(topic, partition, null, record.key(), record.value(), record.headers())).get();ack.acknowledge();
}

3、实现org.apache.kafka.clients.consumer.ConsumerInterceptor,重写onConsume方法

@Override
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {KafkaUtils.getConsumerGroupId();Map<TopicPartition, List<ConsumerRecord<K, V>>> resultRecordMap = new HashMap<>(records.count());records.partitions().forEach(p->{List<ConsumerRecord<K,V>> recordList = records.records(p);// 根据自定义过滤逻辑(如groupId)来判断是否消费该消息List<ConsumerRecord<K, V>> filteredRecords = recordList.stream().filter(this::predict).collect(Collectors.toList());if (!filteredRecords.isEmpty()) {resultRecordMap.put(p, filteredRecords);}});return new ConsumerRecords<>(resultRecordMap);
}

参考

https://rocketmq.apache.org/zh/docs/4.x/consumer/02push
https://rocketmq.apache.org/zh/docs/featureBehavior/10consumerretrypolicy

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com