您的位置:首页 > 娱乐 > 明星 > 公众号商城_企业网络拓扑图及说明_营销助手_石家庄seo排名公司

公众号商城_企业网络拓扑图及说明_营销助手_石家庄seo排名公司

2024/12/23 12:59:16 来源:https://blog.csdn.net/CSDN20221005/article/details/144229884  浏览:    关键词:公众号商城_企业网络拓扑图及说明_营销助手_石家庄seo排名公司
公众号商城_企业网络拓扑图及说明_营销助手_石家庄seo排名公司

Java客户端

快速入门

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。

  1. 由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。
  2. 但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。
  3. 而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

在课前资料给大家提供了一个Demo工程,方便我们学习SpringAMQP的使用:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者

案例需求: 利用SpringAMQP完成消息的收发

在之前的案例中,我们都是经过交换机发送消息到队列,不过有时候为了测试方便,我们也可以直接向队列发送消息,跳过交换机。这种模式一般测试使用,很少在生产中使用。

在入门案例中,我们就演示这样的简单模型,如图:

也就是:

  • publisher直接发送消息到队列
  • 消费者监听并处理队列中的消息

  1. 在父工程中引入spring-amqp依赖,这样publisher和consumer服务都可以使用
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置rabbitmq服务端信息, 微服务才能连接到RabbitMO
spring:rabbitmq:host: 192.168.0.105 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

  1. 利用控制台创建队列simple.queue

  • 之前测试数据隔离时, 账号和队列都已经建好了
  • 账密: hmall/123

  1. 发送消息: 在publisher服务中,利用SpringAMQP提供的RabbitTemplate工具类, 发送信息

@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {//1.队列名String queueName = "simple.queue";//2.消息String message = "Hello,Spring AMQP!";//3.发送消息rabbitTemplate.convertAndSend(queueName, message);}}

  1. 监听队列: SpringAMQP提供声明式的消息监听,通过注解就可以把消息传递给当前方法

  • 消息发送时是什么类型, 接收时就用同类型接收, spring会自动处理

  1. 测试一下
  • 启动publisher(发送者)发送消息

  • 启动consumer(消费者)接受消息

WorkQueue

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

模拟WorkQueue,实现一个队列绑定多个消费者

基本思路如下:

  1. 在RabbitMQ的控制台创建一个队列,名为work.queue

  1. 在publisher服务中定义测试方法,发送50条消息到work.queue
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testWorkQueue() {//1.队列名String queueName = "work.queue";for (int i = 1; i < 50; i++) {//2.消息String message = "Hello,Spring AMQP_" + i;//3.发送消息rabbitTemplate.convertAndSend(queueName, message);}}}
  1. 在consumer服务中定义两个消息监听者,都监听work.queue队列
@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException{System.out.println("消费者1收到消息:" + msg + "," + LocalTime.now());}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException{System.err.println("消费者2...收到消息:" + msg + "," + LocalTime.now());}}
  1. 重启代码, 查看执行结果

  • 默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。
  • 但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
  • 因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息
  1. 让消费者1每秒处理40条消息,消费者2每秒处理5条消息
  • 配置按需获取消息

  • 模拟消息处理能力差异

  • 重启测试: 处理能力越强, 处理的消息越多, 避免了消息堆积, 影响效率

Work模型的使用

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

Fanout交换机

交换机的作用主要是接收发送者发送的消息,并将消息路由到与其绑定的队列。

常见交换机的类型:

  • Fanout: 广播
  • Direct: 定向
  • Topic: 话题

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式

利用SpringAMQP演示FanoutExchange的使用

  1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2

  • 账号: hmall/123

  1. 在RabbitMO控制台中,声明交换机hmall.fanout,将两个队列与其绑定

  1. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {log.info("消费者1监听到 fanout.queue1的消息:{}", msg);}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {log.info("消费者2监听到 fanout.queue2的消息:{}", msg);}}

  1. 在publisher中编写测试方法,向hmall.fanout发送消息

@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testFanoutQueue() {//1.交换机名称String exchangeName = "hmall.fanout";//2.消息String message = "Hello, everyone";//3.发送消息 参数: 交换机名称, RoutingKey(暂时未空), 消息rabbitTemplate.convertAndSend(exchangeName, null, message);}}

  1. 启动服务, 使用publisher服务发送消息, 在consumer服务监听消息

Direct交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey。
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

利用SpringAMQP演示DirectExchange的使用

  1. 在RabbitMQ控制台中,声明队列direct.queue1和direct.quque2

  1. 在RabbitMO控制台中,声明交换机hmall.direct,将两个队列与其绑定

  1. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
@Slf4j
@Component
public class SpringRabbitListener {    @RabbitListener(queues = "direct.queue1")public void listenDirectQueue1(String msg) {log.info("消费者1监听到 direct.queue1的消息:{}", msg);}@RabbitListener(queues = "direct.queue2")public void listenDirectQueue2(String msg) {log.info("消费者2监听到 direct.queue2的消息:{}", msg);}
}
  1. 在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testDirectQueue() {//1.队列名String exchangeName = "hmall.direct";//2.消息String message = "红色: 震惊, 居然卡爆了";//3.发送消息 参数: 交换机名称, RoutingKey, 消息rabbitTemplate.convertAndSend(exchangeName, "red", message);}}
  1. 启动服务, 发消息测试

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同RoutingKey,则与Fanout功能类似

Topic交换机

TopicExchange也是基于RoutingKey做消息路由,但是routingKey通常是多个单词的组合,并且以 . 分割

Topic交换机与队列绑定时, BindingKey可以使用通配符:

#: 代指0个或多个单词

*: 代指一个单词

利用SpringAMQP演示DirectExchange的使用

  1. 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2

  1. 在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定

  1. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "topic.queue1")public void listenTopicQueue1(String msg) {log.info("消费者1监听到 topic.queue1的消息:{}", msg);}@RabbitListener(queues = "topic.queue2")public void listenTopicQueue2(String msg) {log.info("消费者2监听到 topic.queue2的消息:{}", msg);}}
  1. 在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testTopicQueue() {//1.队列名String exchangeName = "hmall.topic";//2.消息String message = "都能收到的消息";//3.发送消息 参数: 交换机名称, RoutingKey, 消息
//        rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
//        rabbitTemplate.convertAndSend(exchangeName, "china.info", message);rabbitTemplate.convertAndSend(exchangeName, "test.news", message);}}
  1. 启动服务进行测试: 根据不同RoutingKey, 交换机把消息分发给不同的消费者

描述下Topic交换机相比Direct交换机的优势

  1. 实现的功能时类似的
  2. 在绑定bindingKey时可以使用通配符, 扩展性和灵活性更好

声明队列交换机

前面我们使用过web控制台的方式创建交换机和队列, 并进行绑定, 实际工作中, 需要使用java代码自动创建

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  1. SpringAMQP提供了Queue类, 用于声明队列

  1. SpringAMQP提供了Exchange接口,来表示所有不同类型的交换机:

  1. 我们可以自己创建队列和交换机,SpringAMQP还提供了ExchangeBuilder来简化这个过程

  1. Binding类: 用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

fanout示例

  1. 先删掉前面创建的hmall.fanout交换机, 以及队列fanout.queue1/fanout.queue2
  2. 在consumer中创建一个类, 声明一个Fanout类型的交换机,并且创建队列与其绑定
  • 创建工作一般放在消费者服务, 因为消息发送者只负责发消息,
  • 消费者接收消息就要考虑交换机和队列, 以及绑定关系

@Configuration
public class FanoutConfiguration {// 声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange() {// 使用构建器创建//  return ExchangeBuilder.fanoutExchange("hmall.fanout").build();// 手动创建return new FanoutExchange("hmall.fanout");}// 声明队列1@Beanpublic Queue fanoutQueue1() {// 使用构建器创建//  return QueueBuilder.durable("fanout.queue1").build();// 手动创建return new Queue("fanout.queue1");}// 绑定队列1和交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 声明队列2@Beanpublic Queue fanoutQueue2() {return new Queue("fanout.queue2");}// 绑定队列1和交换机@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
  1. 重启服务: 队列和交换机已经自动创建, 绑定关系也有

direct示例

  1. 先删掉前面创建的hmall.direct交换机, 以及队列direct.queue1/direct.queue2
  2. 利用SpringAMQP声明DirectExchange并与队列绑定
  3. direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding

@Configuration
public class DirectConfiguration {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 创建队列1*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 创建队列2*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}
  1. 执行代码: 交换机, 队列以及绑定关系已经创建

SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:

@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenDirectQueue1(String msg) {log.info("消费者1监听到 direct.queue1的消息:{}", msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg) {log.info("消费者2监听到 dirce.queue2的消息:{}", msg);}}

消息转换器

Spring的消息发送代码接收的消息体是一个Object:

在数据传输时,它会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

我们来测试一下。

  1. 声明一个队列,名为object.queue

  1. 编写单元测试,向队列中直接发送一条消息, 消息类型为Map
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendObject() {//1.准备消息Map<String, Object> msg = new HashMap<>(2);msg.put("name", "tom");msg.put("age", 18);//2.发送消息rabbitTemplate.convertAndSend("object.queue", msg);}}
  1. 在控制台查看消息

  1. Spring的消息对象的处理默认是基于JDK的ObjectOutputStream完成序列化。存在明显的问题:
  • ·JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差

建议采用JSON序列化代替默认的JDK序列化,要做两件事情

  1. 在publisher和consumer中都要引入jackson依赖:
  • 我们直接在父工程中引入, 避免重复引入
  • 引入后刷新一下mave, 确保子工程加载依赖

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
  1. 配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean
@SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class);}@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
  1. 消费者接收Object
  • 在consumer服务中定义一个新的消费者
  • publisher是用Map发送,那么消费者也一定要用Map接收
@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String, Object> msg) {log.info("消费者监听到 Object.queue的消息:{}", msg);}}
  1. 重启服务, 查看消息

业务改造

需求 :改造余额支付功能,不再同步调用交易服务的OpenFeign接口,而是采用异步MQ通知交易服务更新订单状态

  1. 配置MQ: 不管是生产者(pay-service) 还是 消费者(trade-service),都需要配置MQ的基本信息。

引入依赖

  <!--消息发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

配置地址

spring:rabbitmq:host: 192.168.1.97 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

配置消息转换器

  • 因为多个服务都要使用mq, 所以每个服务都配置消息转换器太麻烦, 就在commom服务中进行配置

@Configuration
public class MqConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}
  • 配置类生效的前提是让spring扫描器扫描到, 在factories文件中指定文件, 让配置文件生效

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.hmall.common.config.MyBatisConfig,\com.hmall.common.config.MvcConfig,\com.hmall.common.config.MqConfig,\com.hmall.common.config.JsonConfig

  1. 接收消息: 在trade-service服务中定义一个消息监听类

@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(name = "pay.direct"),key = "pay.success"))public void listenPatSuccess(Long orderId) {orderService.markOrderPaySuccess(orderId);}
}

  1. 发送消息: 修改pay-service服务下的PayOrderServiceImpl类中的tryPayOrderByBalance方法
@Slf4j
@Service
@RequiredArgsConstructor
public class PayOrderServiceImpl extends ServiceImpl<PayOrderMapper, PayOrder> implements IPayOrderService {private final RabbitTemplate rabbitTemplate;@Override@Transactionalpublic void tryPayOrderByBalance(PayOrderFormDTO payOrderFormDTO) {... ...// 5.修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try {rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("发送支付状态通知失败, 订单id: {}", po.getBizOrderNo(), e);}}
}
  • 异步通知尽量不要对原有业务产生影响, 简单的处理就是使用try捕获异常

  1. 重启服务, 进行测试

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com