您的位置:首页 > 汽车 > 新车 > 聊聊PushConsumer与SimpleConsumer拉取消息的区别

聊聊PushConsumer与SimpleConsumer拉取消息的区别

2025/2/22 3:19:25 来源:https://blog.csdn.net/hello_ejb3/article/details/141101023  浏览:    关键词:聊聊PushConsumer与SimpleConsumer拉取消息的区别

本文主要研究一下rocketmq5的PushConsumer与SimpleConsumer拉取消息的区别

ProcessQueueImpl

org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java

    private void receiveMessageImmediately(String attemptId) {final ClientId clientId = consumer.getClientId();if (!consumer.isRunning()) {log.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, clientId);return;}try {final Endpoints endpoints = mq.getBroker().getEndpoints();final int batchSize = this.getReceptionBatchSize();final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,longPollingTimeout, attemptId);activityNanoTime = System.nanoTime();// Intercept before message reception.final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);consumer.doBefore(context, Collections.emptyList());final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,longPollingTimeout);Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {@Overridepublic void onSuccess(ReceiveMessageResult result) {// Intercept after message reception.final List<GeneralMessage> generalMessages = result.getMessageViewImpls().stream().map((Function<MessageView, GeneralMessage>) GeneralMessageImpl::new).collect(Collectors.toList());final MessageInterceptorContextImpl context0 =new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);consumer.doAfter(context0, generalMessages);try {onReceiveMessageResult(result);} catch (Throwable t) {// Should never reach here.log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "+ "clientId={}", mq, endpoints, clientId, t);onReceiveMessageException(t, attemptId);}}@Overridepublic void onFailure(Throwable t) {String nextAttemptId = null;if (t instanceof StatusRuntimeException) {StatusRuntimeException exception = (StatusRuntimeException) t;if (org.apache.rocketmq.shaded.io.grpc.Status.DEADLINE_EXCEEDED.getCode() == exception.getStatus().getCode()) {nextAttemptId = request.getAttemptId();}}// Intercept after message reception.final MessageInterceptorContextImpl context0 =new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);consumer.doAfter(context0, Collections.emptyList());log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +"nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,clientId, t);onReceiveMessageException(t, nextAttemptId);}}, MoreExecutors.directExecutor());receptionTimes.getAndIncrement();consumer.getReceptionTimes().getAndIncrement();} catch (Throwable t) {log.error("Exception raised during message reception, mq={}, clientId={}", mq, clientId, t);onReceiveMessageException(t, attemptId);}}

PushConsumer通过ProcessQueueImpl的receiveMessageImmediately拉取消息,其内部是通过consumer.receiveMessage(request, mq, longPollingTimeout)来拉取的,request是通过consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression, longPollingTimeout, attemptId)构建的

SimpleConsumerImpl

org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java

    public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration invisibleDuration) {if (!this.isRunning()) {log.error("Unable to receive message because simple consumer is not running, state={}, clientId={}",this.state(), clientId);final IllegalStateException e = new IllegalStateException("Simple consumer is not running now");return Futures.immediateFailedFuture(e);}if (maxMessageNum <= 0) {final IllegalArgumentException e = new IllegalArgumentException("maxMessageNum must be greater than 0");return Futures.immediateFailedFuture(e);}final HashMap<String, FilterExpression> copy = new HashMap<>(subscriptionExpressions);final ArrayList<String> topics = new ArrayList<>(copy.keySet());// All topic is subscribed.if (topics.isEmpty()) {final IllegalArgumentException e = new IllegalArgumentException("There is no topic to receive message");return Futures.immediateFailedFuture(e);}final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));final FilterExpression filterExpression = copy.get(topic);final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionLoadBalancer(topic);final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {final MessageQueueImpl mq = result.takeMessageQueue();final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,invisibleDuration, awaitDuration);return receiveMessage(request, mq, awaitDuration);}, MoreExecutors.directExecutor());return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),clientCallbackExecutor);}

SimpleConsumerImpl的receive0也是通过ConsumerImpl的receiveMessage(request, mq, awaitDuration)方法来拉取消息的,其request是通过wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration, awaitDuration)来构建的

ConsumerImpl

org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java

receiveMessage

    protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request,MessageQueueImpl mq, Duration awaitDuration) {List<MessageViewImpl> messages = new ArrayList<>();try {final Endpoints endpoints = mq.getBroker().getEndpoints();final Duration tolerance = clientConfiguration.getRequestTimeout();final Duration timeout = awaitDuration.plus(tolerance);final ClientManager clientManager = this.getClientManager();final RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> future =clientManager.receiveMessage(endpoints, request, timeout);return Futures.transformAsync(future, responses -> {Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR).setMessage("status was not set by server").build();Long transportDeliveryTimestamp = null;List<Message> messageList = new ArrayList<>();for (ReceiveMessageResponse response : responses) {switch (response.getContentCase()) {case STATUS:status = response.getStatus();break;case MESSAGE:messageList.add(response.getMessage());break;case DELIVERY_TIMESTAMP:final Timestamp deliveryTimestamp = response.getDeliveryTimestamp();transportDeliveryTimestamp = Timestamps.toMillis(deliveryTimestamp);break;default:log.warn("[Bug] Not recognized content for receive message response, mq={}, " +"clientId={}, response={}", mq, clientId, response);}}for (Message message : messageList) {final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, transportDeliveryTimestamp);messages.add(view);}StatusChecker.check(status, future);final ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(endpoints, messages);return Futures.immediateFuture(receiveMessageResult);}, MoreExecutors.directExecutor());} catch (Throwable t) {// Should never reach here.log.error("[Bug] Exception raised during message receiving, mq={}, clientId={}", mq, clientId, t);return Futures.immediateFailedFuture(t);}}

receiveMessage方法通过clientManager.receiveMessage(endpoints, request, timeout)来拉取消息,之后转换为ReceiveMessageResult

wrapReceiveMessageRequest(ProcessQueueImpl)

    ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,FilterExpression filterExpression, Duration longPollingTimeout, String attemptId) {attemptId = null == attemptId ? UUID.randomUUID().toString() : attemptId;return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup()).setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression)).setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos())).setBatchSize(batchSize).setAutoRenew(true).setAttemptId(attemptId).build();}

ProcessQueueImpl调用的wrapReceiveMessageRequest只传递了batchSize、mq、filterExpression, longPollingTimeout, attemptId这几个参数

wrapReceiveMessageRequest(SimpleConsumerImpl)

    ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,FilterExpression filterExpression, Duration invisibleDuration, Duration longPollingTimeout) {final org.apache.rocketmq.shaded.com.google.protobuf.Duration duration = Durations.fromNanos(invisibleDuration.toNanos());return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup()).setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression)).setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos())).setBatchSize(batchSize).setAutoRenew(false).setInvisibleDuration(duration).build();}

SimpleConsumerImpl调用的wrapReceiveMessageRequest只传递了maxMessageNum(batchSize), mq, filterExpression, invisibleDuration, awaitDuration(longPollingTimeout)这几个参数
区别在于一个是setAutoRenew为true且设置了attemptId,一个是setAutoRenew为false且设置了invisibleDuration

小结

rocketmq5的PushConsumer与SimpleConsumer拉取消息都是通过ConsumerImpl的receiveMessage方法来拉取的,区别在于构建的ReceiveMessageRequest参数不一样,一个是setAutoRenew为true且设置了attemptId,一个是setAutoRenew为false且设置了invisibleDuration。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com