SpringAMQP
任何语言只要遵循AMAP协议,都可以与RabbitMQ交互
生产者、队列、交换机、消费者
生产者Publisher:生产消息,发给交换机
交换机Exchange:接收生产者发送的消息。知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
队列Queue:接收消息、缓存消息。队列一定要与交换机绑定。
消费者Consumer:订阅队列。
AMQP协议
Spring官方基于Rabbitmq提供了一套消息收发的模版工具:SpringAMQP
生产者
RabbitTemplate对象
convertAndSend方法
消费者
RabbitListener监听队列信息
WorkQueues模型
任务模型,简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
场景:当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work模型,让多个消费者共同处理消息,消息处理的速度就能大大提升。
Work模型总结
多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。
通过设置prefetch来控制消费者预取的消息数量
交换机Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
Fanout:广播,将消息交给所有绑定到交换机的队列。
Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列。
Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符。
Headers:头匹配,基于MQ的消息头匹配,用的较少。
常用的交换机类型:
Fanout:广播,将消息交给所有绑定到交换机的队列。
Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列。
Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符。
- # 匹配一个或多个词
- * 匹配不多不少恰好一个词
消息转换器convertAndSend
Spring的消息发送代码接收的消息体是一个Object
在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。
MQ高级
MQ消息的可靠性,发送失败,如何兜底?
1、发送者的可靠性
1.1 生产者重试机制
确保生产者一定把消息发到MQ
问题:生产者发送消息时,网络故障,导致与MQ的连接中断
解决方案:Spring AMQP提供的消息发送时的重试机制。
1.2 生产者确认机制
确保MQ不会将消息弄丢
问题:MQ内部处理消息的进程发生了异常;生产者发送消息到达MQ后未找到Exchange;生产者发送消息到达MQ的Exchange后,未找到合适Queue,无法路由。
解决方案:RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Public Return两种
1.3 实现生产者确认
确保消费者一定要处理消息
实现:开启生产者确认;定义ReturnCallback;定义ConfirmCallback
2、MQ的可靠性
2.1 数据持久化
交换机持久化;队列持久化;消息持久化
2.2 LazyQueue
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但某些特殊情况下,会导致消息积压,如:
消费者宕机或出现网络故障;消息发送量激增,超过了消费者处理速度;消费者处理业务发生阻塞。
从RabbitMQ的3.6版本开始,增加了Lazy Queues的模式(3.12版本后,LazyQueue成了所有队列的默认格式),也就是惰性队列。特征如下:
接收到消息后直接存入磁盘而非内存;
消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
支持数百万条的消息存储
3、消费者的可靠性
3.1 消费者确认机制
3.2 失败重试机制
3.3 失败处理策略
3.4 业务幂等性
3.5 兜底方案
思路:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。
4、延迟消息
4.1 死信交换机和延迟消息
死信(符合下列情况之一):
- 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
总结:RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。
4.2 DelayExchange插件
死信队列虽然可以实现延迟消息,但是太麻烦。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。