您的位置:首页 > 房产 > 家装 > 北京室内设计公司排行_seo教程网站_搜索引擎优化方案_seo包年优化

北京室内设计公司排行_seo教程网站_搜索引擎优化方案_seo包年优化

2025/4/25 11:52:51 来源:https://blog.csdn.net/L3526581402/article/details/147076107  浏览:    关键词:北京室内设计公司排行_seo教程网站_搜索引擎优化方案_seo包年优化
北京室内设计公司排行_seo教程网站_搜索引擎优化方案_seo包年优化

1.概述

RabbitMQ作为消息队列,有6种队列模型,分别在不同的场景进行使用,分别是Hello World,Work queues,Publish/Subscribe,Routing,Topics,RPC。

下面就分别对几个模型进行讲述。

2.Hello World

这个模型也叫直连模型,从这个名字来看就知道它的原理很简单,是一个线性的且没有分支的模型。

生产者生产消息,把消息给队列,队列在把消息给消费者进行消费。

代码实现

工具类

public class RabbitMQUtils {public static Connection getConnection() throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.153.132");//设置rabbitmq的主机地址factory.setPort(5672);//设置rabbitmq的端口号factory.setUsername("admin");//设置用户名factory.setPassword("admin");//设置密码factory.setVirtualHost("/abc");//设置虚拟主机//根据连接工厂获取一个连接对象Connection connection = factory.newConnection();return connection;}
}

 生产者

/*** 基于HelloWorld消息模型的消息生产者*/
public class MessageProvider1 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection= RabbitMQUtils.getConnection();//根据连接对象创建一个Channel(信道)Channel channel = connection.createChannel();/*** 根据Channel声明一个队列* 参数1:队列的名称* 参数2:描述队列是否持久化(true 讲队列以文件的形式保存在硬盘上,下一次启动mq服务还可以看到该队列)* 参数3:是否独占队列(true 只有当前会话才能使用该队列)* 参数4:队列是否会自动删除* 参数5:对队列进行的额外设置*/channel.queueDeclare("hello", true, false, false, null);/*** 发布消息* 参数1:交换机的名称* 参数2:队列的名称* 参数3:消息进行额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN消息持久化* 参数4:消息主体,以字节数组的方式进行消息的发送*/channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());//关闭资源channel.close();connection.close();}
}

消费者部分 

/*** 基于HelloWorld消息模型的消息消费者*/
public class MessageConsumer1 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection= RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//声明一个队列channel.queueDeclare("hello", true, false, false, null);/*** 消息的消费* 参数1:队列的名称* 参数2:是否开启自动确认机制 true 开启* 参数3:处理队列里面消息的回调函数*/channel.basicConsume("hello",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body);System.out.println(msg);}});}
}

 这时候先启动消费者等待生产者的消息,然后再开启生产者,那么消费者就会接收到消息,这里是”Hello rabbit“

这里的队列名称要自己再RabbitMQ的管理界面进行创建

在这个界面可以进行手动创建

3.Work queues

它与第一种模型不同的是,消息队列不再是单一的讲消息传送给一个消费者,而是可以传送给多个消费者。它就解决了生产者生产消息的速率大于消费者消费消息的速率,使消息不断地堆积在队列中。

同样的它的模型也很简单,也是生产消息给队列,队列再将消息分给队列。

代码实现

生产者

/*** 消息的生产者 -- 工作队列实现*/
public class MessageProvider2 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("work", false, false, false, null);// 发送消息for (int i = 1; i <= 10; i++) {String message = "hello,SpringCloud" + i;channel.basicPublish("", "work", null, message.getBytes());}// 关闭资源channel.close();connection.close();}
}

消费者 

public class MessageConsumer_q2 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();final Channel channel = connection.createChannel();//声明队列channel.queueDeclare("work",false,false,false,null);//消费消息 参数2 false 关闭消息的自动确认机制channel.basicConsume("work",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息是:" + new String(body));}});}
}

消费者的代码都一样的,就不重复写了。

但现在要思考一个问题,就是我这里的代码是开启了消息的自动确认机制,就是意思是队列会将消息按照顺序平均分配给消费者,也就是消费者会先再队列中确认消息(确认意味着消息从队列中删除),这样就会出现一个问题,有的消费者消费得快,有得消费者消费得慢,这样得结果显然不是我们想要的,我们肯定希望能者多劳,也就是按需分配,而不是按量平均分配 。还有一个问题,就是消费者一下子确认这么多消息然后慢慢消费,万一某个消费者在消费的时候挂掉了,这些消息都已经不在队列中了,那不就直接导致消息丢失了。

因此我们需要对消息进行手动确认而不是自动确认

public class MessageConsumer_q1 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();final Channel channel = connection.createChannel();//声明队列channel.queueDeclare("work", false, false, false, null);//消费消息  第二个参数改成false,即关闭自动确认channel.basicConsume("work", false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 开启手动确认 不再同时确认多条消息channel.basicAck(envelope.getDeliveryTag(), false);System.out.println("消息是:" + new String(body));}});}
}

4.fanout

这个模型就相对复杂点,多了一个交换机。

广播类型的交换机可以和任意类型的队列进行配对。

每个消费者有自己的队列。

每个队列都要绑定到交换机.

生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

交换机把消息发送给绑定过的所有队列 队列的消费者都能拿到消息。

实现一条消息被多个消费者消费

代码实现

生产者

public class MessageProvider3 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机的名称* 参数2:交换机的类型  fanout广播类型的交换机(可以和任何队列进行绑定)*/channel.exchangeDeclare("exchange_fanout","fanout");/*** 发布消息* 参数1:交换机的名称* 参数2;路由key 由于是广播类型的交换机,所以不用定义路由key* 参数3:对消息进行的额外设置* 参数4:消息主体*/channel.basicPublish("exchange_fanout","",null,"hello rabbitmq".getBytes());}
}

 消费者

public class MessageConsumer3 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare("exchange_fanout", "fanout");//声明队列channel.queueDeclare("q1", false, false, false, null);//将交换机和队列进行绑定 参数1: 队列的名称 参数2: 交换机的名称 参数3: 路由keychannel.queueBind("q1", "exchange_fanout", "");//消费消息channel.basicConsume("q1", true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}});}
}

一条消息会被所有消费者消费 

5.Direct

在前面的模型中,一条消息被所有的消费者消费,但是在某些特定的场景下,我们希望不同的消息被不同的队列消费

生产者向交换机发送消息时,会指定一个routing key

交换机在接收到消息时会根据这个key来对它绑定的队列的routing key进行匹配,匹配的才会发送过去。

代码实现

生产者

/*** 消息的生产者 -- direct消息模型*/
public class MessageProvider {public static void main(String[] args) throws Exception {// 获取与RabbitMQ的连接Connection connection = RabbitMQUtils.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明交换机,名称为exchange_direct,类型为directchannel.exchangeDeclare("exchange_direct", "direct");// 发布消息,发送到exchange_direct交换机,routingKey为add,消息内容为"hello, this is direct message"channel.basicPublish("exchange_direct", "add", null, "hello, this is direct message".getBytes());// 关闭信道channel.close();// 关闭连接connection.close();}
}

消费者1

 

/*** 消息的消费者1 -- direct类型*/
public class ConsumerProvider_1 {public static void main(String[] args) throws Exception {// 获取与RabbitMQ的连接Connection connection = RabbitMQUtils.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明交换机,名称为exchange_direct,类型为directchannel.exchangeDeclare("exchange_direct", "direct");// 声明队列,名称为queue_directchannel.queueDeclare("queue_direct", false, false, false, null);// 使用routingKey绑定队列和交换机,分别绑定add、update的routingKeychannel.queueBind("queue_direct", "exchange_direct", "add");channel.queueBind("queue_direct", "exchange_direct", "update");// 消费消息,设置为自动确认channel.basicConsume("queue_direct", true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印接收到的消息内容System.out.println(new String(body));}});}
}

消费者2

 

/*** 消息的消费者2 -- direct类型*/
public class ConsumerProvider_2 {public static void main(String[] args) throws Exception {// 获取与RabbitMQ的连接Connection connection = RabbitMQUtils.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明交换机,名称为exchange_direct,类型为directchannel.exchangeDeclare("exchange_direct", "direct");// 声明队列,名称为queue_direct1channel.queueDeclare("queue_direct1", false, false, false, null);// 使用routingKey绑定队列和交换机,分别绑定update、delete的routingKeychannel.queueBind("queue_direct1", "exchange_direct", "update");channel.queueBind("queue_direct1", "exchange_direct", "delete");// 消费消息,设置为自动确认channel.basicConsume("queue_direct1", true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印接收到的消息内容System.out.println(new String(body));}});}
}

消费者1的队列与交换机绑定的routing key分别是 add、update

消费者2的队列与交换机绑定的routing key分别是update、delete

示例代码中的生产者生产的消息的routing key是add,那么只有消费者1能接收到消息。

6.Topics

这个模型与Direct的原理是一样的,差异就在于它的routing key是有模糊匹配的机制。

但是它的routing key一般是由多个单词拼在一起,中间由”.“进行隔开,例如item.insert

这里的通配符有两种

分别是*和#

*可以匹配一个单词

#可以匹配一个或多个单词

代码演示

生产者

public class MessageProvider5 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//声明topic类型的交换机channel.exchangeDeclare("topic_exchange", "topic");//发布消息channel.basicPublish("topic_exchange","user.product.add",null,"hello,this is topic message".getBytes());//关闭资源channel.close();connection.close();}
}

消费者1 

public class MessageConsumer5_1 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare("topic_exchange", "topic");//声明队列channel.queueDeclare("topic_q1", false, false, false, null);//将队列和交换机进行绑定channel.queueBind("topic_q1", "topic_exchange", "user.*");//消费消息channel.basicConsume("topic_q1", true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}});}
}

消费者2

 

public class MessageConsumer5_2 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare("topic_exchange", "topic");//声明队列channel.queueDeclare("topic_q2", false, false, false, null);//将队列和交换机进行绑定channel.queueBind("topic_q2", "topic_exchange", "user.#");//消费消息channel.basicConsume("topic_q2", true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}});}
}

7.RPC(了解)

  1. 客户端
    • 与 RabbitMQ 建立连接并创建信道。
    • 声明一个用于接收响应的回调队列。
    • 生成一个唯一的 correlationId,用于匹配请求和响应。
    • 发送请求消息到 RPC 队列,同时设置 replyTo 为回调队列名称,correlationId 为生成的唯一标识。
    • 从回调队列接收响应消息,并根据 correlationId 进行匹配。
  2. 服务端
    • 与 RabbitMQ 建立连接并创建信道。
    • 声明 RPC 队列。
    • 从 RPC 队列接收请求消息。
    • 处理请求消息,生成响应结果。
    • 将响应消息发送到 replyTo 指定的回调队列,并带上相同的 correlationId

 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com