您的位置:首页 > 科技 > 能源 > rust操作rabbitmq

rust操作rabbitmq

2024/9/24 1:19:00 来源:https://blog.csdn.net/A_super_C/article/details/141283377  浏览:    关键词:rust操作rabbitmq

Rust 操作 Rabbitmq

使用docker快速部署rabbitmq

docker pull rabbitmq:management
# 15672为rabbitmq 管理员端口,默认账号密码为guest(账号密码相同)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

rust 添加amqp库lapin

cargo add lapin

1. 连接到rabbitmq

let conn=lapin::Connection::connect("amqp://localhost:5672",lapin::ConnectionProperties::default(),).await?;
let chan=conn.create_channel().await?;

2. 交换机创建和队列创建

//创建一个名为itest的交换机,模式为话题模式
chan.exchange_declare("itest",lapin::ExchangeKind::Topic,lapin::options::ExchangeDeclareOptions::default(),lapin::types::FieldTable::default(),
)
.await?;
//创建一个名为queue1的队列
chan.queue_declare("queue1",lapin::options::QueueDeclareOptions::default(),lapin::types::FieldTable::default(),
)
.await?;
//绑定队列到交换机,将名为队列queue1绑到交换机itest,并设置路由名为/queue1
chan.queue_bind("queue1","itest","/queue1",lapin::options::QueueBindOptions::default(),lapin::types::FieldTable::default(),
).await?;

3. 生产者发布消息

// 发送给itest交换机,交换机会把消息交给路由/queue1
chan.basic_publish("itest","/queue1",lapin::options::BasicPublishOptions::default(),"hello".as_bytes(),lapin::BasicProperties::default(),
).await.expect("publish message failed");

4. 消费者订阅消息

let consumer = chan.basic_consume("queue1","",lapin::options::BasicConsumeOptions::default(),lapin::types::FieldTable::default(),).await?;
consumer.set_delegate(|d: lapin::message::DeliveryResult| async move {match d {Err(err) => eprintln!("subscribe message error {err}"),Ok(data) => {if let Some(data) = data {let raw = data.data.clone();let f = data.ack(lapin::options::BasicAckOptions::default());println!("accept msg {}",String::from_utf8(raw).expect("parse msg failed"));if let Err(err) = f.await {eprintln!("ack failed {err}");}}}}
});

最终demo

#[cfg(test)]
mod mq{#[tokio::test]async fn rabbitmq() -> Result<(), Box<dyn std::error::Error>> {//连接到rabbitmqlet conn = lapin::Connection::connect("amqp://localhost:5672",lapin::ConnectionProperties::default(),).await?;let chan = conn.create_channel().await?;//初始化queue和exchangechan.queue_declare("queue1",lapin::options::QueueDeclareOptions::default(),lapin::types::FieldTable::default(),).await?;chan.exchange_declare("itest",lapin::ExchangeKind::Topic,lapin::options::ExchangeDeclareOptions::default(),lapin::types::FieldTable::default(),).await?;chan.queue_bind("queue1","itest","/queue1",lapin::options::QueueBindOptions::default(),lapin::types::FieldTable::default(),).await?;//发送消息tokio::spawn(async move {chan.basic_publish("itest","/queue1",lapin::options::BasicPublishOptions::default(),"hello".as_bytes(),lapin::BasicProperties::default(),).await.expect("publish message failed");});let chan = conn.create_channel().await?;let consumer = chan.basic_consume("queue1","",lapin::options::BasicConsumeOptions::default(),lapin::types::FieldTable::default(),).await?;//使用回调来触发接受到新消息时的操作,使用futures_lite 中StreamExt 可以不使用回调consumer.set_delegate(|d: lapin::message::DeliveryResult| async move {match d {Err(err) => eprintln!("subscribe message error {err}"),Ok(data) => {if let Some(data) = data {let raw = data.data.clone();let f = data.ack(lapin::options::BasicAckOptions::default());println!("accept msg {}",String::from_utf8(raw).expect("parse msg failed"));if let Err(err) = f.await {eprintln!("ack failed {err}");}}}}});tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;Ok(())}
}

结果展示

请添加图片描述
rabbitmq 管理后台页面可以看到我们创建的itest交换机和queue1队列向绑定,queue1的路由地址为/queue1
在这里插入图片描述

简言

amqp 包其实无论是rust 的lapin还是golang的streadway/amqp,操作手法整体都是一样的,rabbitmq其它几种模式可以参考我goalng 的rabbitmq几种模式下操作方式来类推

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com