您的位置:首页 > 汽车 > 时评 > 龙口网站设计_广州网站推广解决方案_成都网站快速排名提升_重庆seo标准

龙口网站设计_广州网站推广解决方案_成都网站快速排名提升_重庆seo标准

2024/11/20 13:21:34 来源:https://blog.csdn.net/weixin_42118323/article/details/142830124  浏览:    关键词:龙口网站设计_广州网站推广解决方案_成都网站快速排名提升_重庆seo标准
龙口网站设计_广州网站推广解决方案_成都网站快速排名提升_重庆seo标准

前言:

在消息中间件领域中 RocketMQ 是一种非常常见的消息中间件了,并且是由阿⾥巴巴开源的消息中间件 ,本篇简单分享一下 Spring Boot 项目集成 RocketMQ 的过程。

RocketMQ 系列文章传送门

RocketMQ 的介绍及核心概念讲解

Spring Boot 集成 RocketMQ 可以分为三大步,如下:

  • 在 proerties 或者 yml 文件中添加 RocketMQ 配置。
  • 项目 pom.xml 文件中引入 rocketmq-spring-boot-starter 依赖。
  • 注入 RocketMQTemplate 开始使用 RocketMQ ,其实这步以及算是使用了,不能算作集成了,但是集成了总归是要使用的,我把这里也算作一步了。

在 proerties 或者 yml 文件中添加 RabbitMQ 配置如下:

#RocketMQ 地址
rocketmq.name-server= xxx-xxx-rocketmq.xxxx.com:19876
#消费组
rocketmq.consumer.group= consumer-group
#一次拉取消息的最大数量 默认 10 条
rocketmq.consumer.pull-batch-size=10
#发送消息的组 同一类消息发送到同一个组中
rocketmq.producer.group= producer-group
#发送消息的超时时间 默认 3000 毫秒
rocketmq.producer.send-message-timeout=3000
#同步发送消息失败重试次数 默认2
rocketmq.producer.retry-times-when-send-failed=2
#异步发送消息失败重试次数 默认2
rocketmq.producer.retry-times-when-send-async-failed=2
#消息的大小 默认 4M
rocketmq.producer.max-message-size=4096

项目 pom.xml 文件中引入 rocketmq-spring-boot-starter 依赖如下:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>

RocketMQ 使用

前文我们在分享 RocketMQ 核心概念的时候,我们知道了 RocketMQ 有同步消息、异步消息、顺序消息、延迟消息等,下面我们就根据消息的发送类型来演示 RocketMQ 的使用。

@RocketMQMessageListener 注解详解

我们在使用 RocketMQ 的时候有一个非常重要的注解 @RocketMQMessageListener,使用这个注解我们就可以轻松的完成 RocketMQ 消息的消费,这里对该注解的的属性进行解析,如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {// nameServer服务地址 多个用;隔开 可以直接在注解中指定也可以读取配置文件String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";// ACL验证key 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";// ACL验证密钥 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";// 自定义的消息轨迹主题String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";// 消费者分组 不同消费者分组名称不能重复String consumerGroup();// topic名称String topic();// selectorType 消息选择器类型// 默认值 SelectorType.TAG 根据TAG选择// 仅支持表达式格式如:“tag1 || tag2 || tag3” 如果表达式为null或者“*”标识订阅所有消息// SelectorType.SQL92 根据SQL92表达式选择SelectorType selectorType() default SelectorType.TAG;String selectorExpression() default "*";// 消费模式 可以选择并发或有序接收消息 默认并发消费模式 ConsumeMode.CONCURRENTLYConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;// 控制消息模式 可以选择集群和广播 默认是集群 MessageModel.CLUSTERING// 集群模式: 消息只会被一个消费者消费 广播模式:消息被所有消费者都消费一次MessageModel messageModel() default MessageModel.CLUSTERING;// 消费者最大线程数 在5.x版本该参数已经不推荐使用 因为该实现方式底层线程使用 LinkedBlockingQueue 作为阻塞队列 队列长度使用Integer.MAX_VALUE。@Deprecatedint consumeThreadMax() default 64;// 消费线程数 属于 rocketmq-spring-boot-starter 2.2.3新参数 推荐使用该版本int consumeThreadNumber() default 20;// 消费失败重试次数 在MessageModel.CLUSTERING模式中,-1表示16,消费失败后会重试16次int maxReconsumeTimes() default -1;// 最大消费时间 默认15分钟long consumeTimeout() default 15L;// 发送回复消息超时 默认3000毫秒int replyTimeout() default 3000;// ACL验证key 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件String accessKey() default ACCESS_KEY_PLACEHOLDER;// ACL验证密钥 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件String secretKey() default SECRET_KEY_PLACEHOLDER;// 切换消息跟踪的标志实例boolean enableMsgTrace() default false;// 自定义跟踪主题String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;// nameServer服务地址 可以直接在注解中指定也可以读取配置文件String nameServer() default NAME_SERVER_PLACEHOLDER;// The property of "access-channel".String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;// The property of "tlsEnable" default false.String tlsEnable() default "false";// 使用者的命名空间String namespace() default "";// 并发模式下的消息消耗重试策略 下次消费时的延迟级别int delayLevelWhenNextConsume() default 0;// 以有序模式暂停拉入的间隔 以毫秒为单位 最小值为10 最大值为30000 默认1000毫秒int suspendCurrentQueueTimeMillis() default 1000;// 关闭使用者时等待消息消耗的最长时间 以毫秒为单位  最小值为0 默认1000毫秒int awaitTerminationMillisWhenShutdown() default 1000;// 实例名称String instanceName() default "DEFAULT";
}

RocketMQ 发送单向消息

单向消息是指生产者 Producer 向 Broker 发送消息,执行发送消息的 API 后直接返回,不关注 Broker 的结果,简单说就负责发送消息不关注消息是否发送成功,这种模式的优点是发生消息耗时非常低,一般在微妙级别,通常用在消息可靠性要求不高的场景,例如记录日志等场景,下面我们来演示一下 RocketMQ 单向消息的发送。

单向消息生产者代码如下:

package com.order.service.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class OneWayMessageProducer {@Autowiredprivate RocketMQTemplate rocketMqTemplate;//单向消息发送public void sendOneWayMessage(String message){rocketMqTemplate.sendOneWay("one-way-topic", MessageBuilder.withPayload(message).build());}}

单向消息消费者代码如下:

package com.order.service.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "one-way-group", topic = "one-way-topic")
public class OneWayMessageCousumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("单向消息消费成功:{}", message);}
}

触发单向消息发送代码如下:

@GetMapping("/send-one-way-message")
public String sendOneWayMessage(@RequestParam String message){oneWayMessageProducer.sendOneWayMessage(message);return "success";
}

单向消息发送测试结果如下:

2024-10-10 19:51:47.144  INFO 15172 --- [MessageThread_1] c.o.s.r.consumer.OneWayMessageCousumer   : 单向消息消费成功:send-one-way-message

RocketMQ 发送同步消息

发送同步消息是指生产者 Producer 向 Broker 发送消息,执行发送消息的 API 后同步等待, 直到 Broker 返回发送结
果,因为有等待动作,很明显发送同步消息会阻塞线程,因此性能相对会差一些,但是同步消息的可靠性高,因此这种方式得到广泛使用,例如短信通知,邮件通知,站内消息等场景。

同步消息生产者代码如下:

package com.order.service.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;@Slf4j
@Component
public class SyncMessageProducer {@Autowiredprivate RocketMQTemplate rocketMqTemplate;/*** @param message:* @date 2024/10/10 17:47* @description 同步消息发送*/public void sendSyncMessage(String message) {rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build());}/*** @param message:* @date 2024/10/10 17:47* @description 批量发送同步消息*/public void sendSyncMessageBatch(String message) {Message<String> build = MessageBuilder.withPayload(message).build();List<Message<String>> msgList = new ArrayList<>();msgList.add(build);msgList.add(build);msgList.add(build);msgList.add(build);rocketMqTemplate.syncSend("sync-topic", msgList);}/*** @param message:* @date 2024/10/10 17:47* @description 发送同步消息设置超时时间 超时时间 1毫秒*/public void sendSyncMessageTimeout(String message) {//超时时间为 1 毫秒rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build(), 200);}/*** @param message:* @date 2024/10/10 17:47* @description 批量发送同步消息 超时时间 1毫秒*/public void sendSyncMessageBatchTimeout(String message) {Message<String> build = MessageBuilder.withPayload(message).build();List<Message<String>> msgList = new ArrayList<>();msgList.add(build);msgList.add(build);msgList.add(build);msgList.add(build);rocketMqTemplate.syncSend("sync-topic", msgList, 200);}}

在上面的同步消息发送代码中一共有四个方法,分别实现了同步消息发送、同步消息批量发送、带超时时间的同步消息发送、带超时时间的同步消息批量发送。

同步消息消息消费者代码如下:

package com.order.service.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "sync-group", topic = "sync-topic")
public class SyncMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("同步消息消费成功:{}", message);}
}

触发单向消息发送代码如下:

@GetMapping("/send-sync-message")
public String sendSyncMessage(@RequestParam String message){syncMessageProducer.sendSyncMessage(message);return "success";
}@GetMapping("/send-sync-message-batch")
public String sendSyncMessageBatch(@RequestParam String message){syncMessageProducer.sendSyncMessageBatch(message);return "success";
}@GetMapping("/send-sync-message-timeout")
public String sendSyncMessageTimeout(@RequestParam String message){syncMessageProducer.sendSyncMessageTimeout(message);return "success";
}@GetMapping("/send-sync-message-batch-timeout")
public String sendSyncMessageBatchTimeout(@RequestParam String message){syncMessageProducer.sendSyncMessageBatchTimeout(message);return "success";
}

同步消息发送测试结果如下:

2024-10-14 14:37:22.346  INFO 26640 --- [MessageThread_1] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message

结果验证符合预期。

同步消息批量发送测试结果如下:

2024-10-14 14:38:04.120  INFO 26640 --- [MessageThread_4] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.120  INFO 26640 --- [MessageThread_3] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.120  INFO 26640 --- [MessageThread_5] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.122  INFO 26640 --- [MessageThread_6] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch

结果验证符合预期。

带超时时间同步消息发送测试结果如下:

2024-10-14 14:46:07.889  INFO 16760 --- [MessageThread_1] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-timeout

结果验证符合预期,如果想要验证超时效果,直接把超时时间设置的小一点即可,后面我会统一演示超时效果。

带超时时间同步消息批量发送测试结果如下:

2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_3] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_4] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_5] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_2] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout

结果验证符合预期,如果想要验证超时效果,直接把超时时间设置的小一点即可,后面我会统一演示超时效果。

我们来演示一下超时效果,我们把超时时间修改为 10 毫秒时候,带超时时间同步消息发送测试结果如下:

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeoutat org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:667) ~[rocketmq-client-4.8.0.jar:4.8.0]at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343) ~[rocketmq-client-4.8.0.jar:4.8.0]at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:344) ~[rocketmq-client-4.8.0.jar:4.8.0]at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:555) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:484) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]at com.order.service.rocketmq.producer.SyncMessageProducer.sendSyncMessageTimeout(SyncMessageProducer.java:57) ~[classes/:na]at com.order.service.controller.RocketMqController.sendSyncMessageTimeout(RocketMqController.java:47) ~[classes/:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) ~[spring-boot-actuator-2.4.5.jar:2.4.5]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121]at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

很明显提示超时了,超时测试依赖服务器的性能,因此如果想测试到理想的超时结果,建议将超时时间往小了设置。

RocketMQ 发送异步消息

发送异步消息是指生产者 Producer 向 Broker 发送消息,发送消息时指定消息发送成功及发送异常的回调方法,执行发送消息的 API 后立即返回,Producer 发送消息线程无需等待、不阻塞,对比同步消息,很明显异步消息的性能会更高,可靠性会略差,适用于对响应时间要求高的场景。

异步消息生产者代码如下:

package com.order.service.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;@Slf4j
@Component
public class AsyncMessageProducer {@Autowiredprivate RocketMQTemplate rocketMqTemplate;/*** @param message:* @date 2024/200/200 17:47* @description 异步消息发送*/public void sendAsyncMessage(String message) {rocketMqTemplate.asyncSend("async-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("普通异步消息发送成功");}@Overridepublic void onException(Throwable throwable) {log.info("普通异步消息发送失败");}});}/*** @param message:* @date 2024/200/200 17:47* @description 批量发送异步消息*/public void sendAsyncMessageBatch(String message) {Message<String> build = MessageBuilder.withPayload(message).build();List<Message<String>> msgList = new ArrayList<>();msgList.add(build);msgList.add(build);msgList.add(build);msgList.add(build);rocketMqTemplate.asyncSend("async-topic", msgList, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("批量异步消息发送成功");}@Overridepublic void onException(Throwable throwable) {log.info("批量异步消息发送失败");}});}/*** @param message:* @date 2024/200/200 17:47* @description 发送异步消息设置超时时间 超时时间 1毫秒*/public void sendAsyncMessageTimeout(String message) {//超时时间为 1 毫秒rocketMqTemplate.asyncSend("async-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("普通异步带超时消息发送成功");}@Overridepublic void onException(Throwable throwable) {log.info("普通异步带超时消息发送失败");}}, 200);}/*** @param message:* @date 2024/200/200 17:47* @description 批量发送异步消息 超时时间 1毫秒*/public void sendAsyncMessageBatchTimeout(String message) {Message<String> build = MessageBuilder.withPayload(message).build();List<Message<String>> msgList = new ArrayList<>();msgList.add(build);msgList.add(build);msgList.add(build);msgList.add(build);rocketMqTemplate.asyncSend("async-topic", msgList, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("批量异步带超时消息发送成功");}@Overridepublic void onException(Throwable throwable) {log.info("批量异步带超时消息发送失败");}}, 200);}}

异步消息消费者代码如下:

package com.order.service.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "async-group", topic = "async-topic")
public class AsyncMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("异步消息消费成功:{}", message);}
}

异步消息发送测试结果如下:

2024-10-14 15:15:08.215  INFO 16760 --- [ublicExecutor_1] c.o.s.r.producer.AsyncMessageProducer    : 普通异步消息发送成功
2024-10-14 15:15:08.222  INFO 16760 --- [MessageThread_1] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:send-async-message

结果验证符合预期。

批量异步消息发送测试结果如下:

2024-10-14 15:15:39.681  INFO 16760 --- [ublicExecutor_2] c.o.s.r.producer.AsyncMessageProducer    : 批量异步消息发送成功
2024-10-14 15:15:39.682  INFO 16760 --- [MessageThread_2] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:[{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}}]

结果验证符合预期。

异步带超时消息发送测试结果如下:

2024-10-14 15:16:20.643  INFO 16760 --- [ublicExecutor_3] c.o.s.r.producer.AsyncMessageProducer    : 普通异步带超时消息发送成功
2024-10-14 15:16:20.650  INFO 16760 --- [MessageThread_3] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:send-async-message-timeout

结果验证符合预期。

批量异步带超时消息发送测试结果如下:

2024-10-14 15:16:43.326  INFO 16760 --- [ublicExecutor_4] c.o.s.r.producer.AsyncMessageProducer    : 批量异步带超时消息发送成功
2024-10-14 15:16:43.327  INFO 16760 --- [MessageThread_4] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:[{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}}]

结果验证符合预期。

超时场景这里不再测试了,如果想验证超时效果,只需要将超时时间设置的尽可能小一点即可。

总结:本篇简单分享了 Spring 整合 RocketMQ,并完成单向消息、同步消息、异步消息的案例演示,在实际业务中只需要对案例代码进行丰富填充业务逻辑即可,希望可以帮助到大家,后面会持续分享延时消息、顺序消息、事务消息的使用案例。

如有不正确的地方欢迎各位指出纠正。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com