您的位置:首页 > 娱乐 > 八卦 > Nodejs 第七十七章(MQ高级)

Nodejs 第七十七章(MQ高级)

2024/10/7 0:19:29 来源:https://blog.csdn.net/qq1195566313/article/details/139584928  浏览:    关键词:Nodejs 第七十七章(MQ高级)

在这里插入图片描述

MQ介绍和基本使用在75章介绍过了,不再重复

MQ高级用法-延时消息

什么是延时消息?

Producer 将消息发送到 MQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息

插件安装

RabbitMQ延迟队列插件下载

下载地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

安装

把下载好的文件拖到你的rabbitMQ下面的plugins目录里面

#举例
D:\Applaaction\rabbitmq_server-3.13.0\plugins

启用插件

执行下面的命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

检查是否成功

打开可视化面板(可视化面板如何安装75章有讲)

访问 http://localhost:15672/#/ 账号密码都是 guest

发现新增了一个延迟队列类型 x-delayed-message

代码编写

应用场景

现在是2024-06-06 半夜1.08分,我选择外卖预约中午的11.00 - 11.20 左右的外卖,我如果选择下单,那么这个单不会立马推送到商家的客户端里面,而是存放到消息队列,使用延时消息,在差不多的时间段例如10.30左右才会把这个单推送到商家的客户端,这样商家出餐10分钟,骑手送20-30分钟左右,送过来就差不多11点左右

生产者

发布订阅模式在上一章已经讲过了不懂去看上一章

  1. 我们使用新增的延时类型切换一下type类型 x-delayed-message
  2. 连接交换机的时候增加arguments对象 添加 x-delayed-type 目标交换机类型 这儿使用direct
  3. 发布消息的时候增加头部信息 x-delay:延时的时间(毫秒)
import amqplib from 'amqplib'
//1.连接MQ
const connection = await amqplib.connect('amqp://localhost:5672')
//2.创建一个通道
const channel = await connection.createChannel()
//3.创建交换机
/*** @param {string} exchange 交换机名称 随便写* @param {string} type 交换机类型 direct fanout topic headers x-delayed-message* @param {options} options 可选配置项*/
//这个方法就是说如果你创建过这个交换机就不会再创建了 如果没有创建过这个交换机就会创建
await channel.assertExchange('delayed-1', 'x-delayed-message',{arguments:{'x-delayed-type': 'direct' //目标交换机类型}
})//4.发送消息
/*** @param {string} exchange 要发送到交换机的名称* @param {string} routingKey 匹配路由的key* @param {Buffer} buffer 要发送的消息* @param {options} options 可选配置项*/
channel.publish('delayed-1', 'time', Buffer.from('延时消息'),{headers:{'x-delay': 10000 //延时 10秒}
})
//断开连接
await channel.close()
await connection.close()
process.exit(0)

消费者

import amqplib from 'amqplib'
//1.连接MQ
const connection = await amqplib.connect('amqp://localhost:5672')
//2.创建一个通道
const channel = await connection.createChannel()
//3.创建交换机
await channel.assertExchange('delayed-1', 'x-delayed-message',{arguments:{'x-delayed-type': 'direct' //目标交换机类型}
})
//4.创建一个队列
const { queue } = await channel.assertQueue('queue-1')
//5.交换机跟队列要绑定
/*** @param {string} queue 队列名称* @param {string} exchange 交换机名称* @param {string} routingKey 匹配路由的key*/
channel.bindQueue(queue, 'delayed-1', 'time')
//6.消费消息
channel.consume(queue, (msg) => {console.log(msg.content.toString())
}, {noAck: true
})

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com