您的位置:首页 > 文旅 > 美景 > 哪家公司网站制作好_建网站需要多久_沧州网络推广公司_东莞网站优化

哪家公司网站制作好_建网站需要多久_沧州网络推广公司_东莞网站优化

2025/1/4 7:47:24 来源:https://blog.csdn.net/2301_76161469/article/details/143944623  浏览:    关键词:哪家公司网站制作好_建网站需要多久_沧州网络推广公司_东莞网站优化
哪家公司网站制作好_建网站需要多久_沧州网络推广公司_东莞网站优化

目录

工作模式

Simple(简单模式)

Work Queue(工作队列)

Publish/Subscribe(发布/订阅)

Exchange(交换机) 

Routing(路由模式)

Topics(通配符模式)

RPC(RPC通信)

Publisher Confirms(发布确认)

代码实现

Simple(简单模式)

生产者代码

消费者代码

Work Queues(工作队列)

生产者代码

消费者代码

Publish/Subscribe(发布/订阅)

生产者代码

消费者代码

Routing(路由模式)

生产者代码

消费者代码

Topics(通配符模式)

生产者代码

消费者代码

RPC(RPC通信)

客户端代码

服务端代码

Publisher Confirms(发布确认)

Publishing Messages Individually(单独确认)

Publishing Messages in Batches(批量确认)

Handling Publisher Confirms Asynchronously(异步确认)


RabbitMQ 共提供了 7 种工作模式进行消息传递:

在本篇文章中,我们就来学习 RabbitMQ 的工作模式,我们首先来了解这 7 种工作模式分别是怎样的

工作模式

Simple(简单模式)

P 表示 生产者,是消息的发送方

C 表示 消费者,是消息的接收者

Queue:表示 消息队列,用于缓存消息,生产者生产的消息发送到队列中,消费者从队列中取出消息

简单模式下,只有一个生产者和一个消费者,生产者生产的消息存储到队列中后,都由这个消费者消费

特点:一个生产者 P,一个消费者 C,消息只能被消费一次,也称为 点对点(Point-to-Point)模式

适用场景:消息只能被单个消费者处理

在 RabbitMQ 入门中的入门代码的工作模式就是简单模式

Work Queue(工作队列)

此时有 一个生产者和多个消费者

当生产者向队列中发送多条消息后,Work Queue 会将消息分配给不同的消费者,每个消费者接收到的消息不同,由多个消费者共同消费生产者生产的消息

例如:

由 A (生产者)发送不同消息,消息存储到 RabbitMQ 中,接着,由 B(消费者1) 和 C(消费者2) 共同消息 A 发送的消息,此时,RabbitMQ 选择将第一条消息分配给 B,B 消费第一条消息,RabbitMQ 将第二条消息分配给 C,C 消费第二条消息......

B 和 C 接收到的消息是不同的,这两个消费者共同消费 A 发送的所有消息

特点:消息不会重复,分配给不同的消费者

适用场景:集群环境中实现异步处理

Publish/Subscribe(发布/订阅)

其中,X 表示的是 交换机,在 发布/订阅 模式中,多了 Exchange 角色,因此,我们先来学习交换机相关知识

Exchange(交换机) 

Exchange(交换机)的作用是 接收生产者发送的消息,并将消息按照一定的规则路由到一个或多个队列中

生产者的消息都会先发送到交换机,然后再由交换机将消息路由到队列中

在前面 简单模式 工作队列模式 下,图中都没有出现交换机,但实际上,生产者生产的消息都是先发送到交换机,然后再路由到队列中的。在前两种模式下,直接使用 RabbitMQ 提供的内置交换机就可以实现,因此,并没有突出交换机的存在,但实际上生产者生产的消息不会直接投递到队列中

在 RabbitMQ 中,交换机有 4 种类型:FanoutDirectTopic Headers,不同的类型有着不同的路由策略

AMQP 协议中还有两种类型,System 自定义,在这里,我们并不重点关注

Fanout:广播,将消息交给所有绑定到交换机的队列(Publish / Subscribe 模式

Direct:定向,将消息交给符合指定 routing key 的队列(Routing 模式

Topic:通配符,将消息交给符合 routing patterm(路由模式)的队列(Topics 模式

Headers:Headers 类型的交换机通过消息头部的属性来路由消息,而不依赖路由键的匹配规则来路由消息。根据发送的消息内容中的 headers 属性进行匹配,headers 类型的交换机性能会较差,因此也不太实用,基本上也不会进行使用

Exchange(交换机)只负责转发消息,并不具备存储消息的能力,因此,若是没有任何队列与 Exchange 绑定,或是没有符合路由规则的队列,消息就会丢失

接下来,我们来看 RoutingKey BindingKey

RoutingKey:路由键,当生产者将消息发送给交换机时,会指定一个字符串,用于告诉交换机如何处理这个消息

BindingKey:绑定,RabbitMQ 中通过 Binding(绑定)将交换机与队列关联起来,在绑定时会指定一个 Binding Key,这样 RabbitMQ 就知道如何正确地将消息路由到队列

即,绑定时,需要的路由键是 BindingKey;发送消息时,需要的路由键是 RoutingKey

例如:

使用 BindingKey1 将交换机与 队列1 进行绑定,使用 BindingKey2 将交换机与 队列2 进行绑定

若在发送消息时,若设置 Routing Key 设置为 BindingKey1,交换机就会将消息路由到 队列1

即,当消息的 RoutingKey 与队列绑定的 BindingKey 相匹配时,消息才会被路由到这个队列中

其实,BindingKey 也属于路由键的一种,即,在绑定时使用的路由键,有时,也会使用 RoutingKey 表示 BindingKey,即使用 RoutingKey 表示 BindingKey 和 RoutingKey,因此,我们需要根据其使用场景进行区分

在了解了相关概念之后,我们继续看 Publish/ Subscribe 模式

上述有一个生产者 P,多个消费者 C1、C2,X 表示交换机,交换机将消息复制多份每个消费者接收到相同的消息

也就是说,生产者发送一条消息,经过交换机转发到不同的队列,不同的消费者从不同的队列中取出消息进行消费

特点:不同的消费者接收到的消息是相同的

适用场景:消息需要被多个消费者同时接收,如:实时通信或广播消息

Publish/Subscribe(发布/订阅)模式 与 Work Queue(工作队列)模式 最大的区别就是:发布/订阅 模式下,不同消费者接收到的消息是相同的;而 工作队列 模式下,不同消费者接收到的消息是不同的

Routing(路由模式)

路由模式可以看做是 发布订阅模式 的变种,其在发布订阅模式的基础上,增加了路由 key

发布订阅模式会无条件的将所有消息发送给所有消费者,而路由模式下,交换机会根据 RoutingKey 的规则,将数据筛选后发送给对应的消费者队列

也就是说,只有满足条件的队列才会收到消息

如上图所示,Q1 通过 a 与交换机进行绑定,Q2 通过 a、b 和 c 与交换机进行绑定

当 P (生产者)在发送消息时,若设置 Routing Key 设置为 a,则此时 Q1 和 Q2 的 BindingKey 都与其相匹配,消息就会被路由到 Q1 和 Q2 中

而当 P 发送消息时,设置 Routing Key 设置为 b,此时,只有 Q2 的 BindingKey 与其相匹配,消息也就只会被路由到 Q2 中

适用场景:需要根据特定规则分发消息

Topics(通配符模式)

通配符模式,则是 路由模式 的变种,在 RoutingKey 的基础上,增加了 通配符 的功能,使得匹配更加灵活

Topics 和 Routing 的基本原理相同,即:生产者将消息发送给交换机,交换机根据 RoutingKey 将消息转发给与 RoutingKey 匹配的队列

而不同的是,Routing 模式下,需要 RoutingKey 和 BingingKey 完全匹配;而 Topics 模式下,则是通配符匹配

在 BindingKey 中,存在两种特殊的字符串,用于模糊匹配

* :表示能够匹配任意一个单词

#:表示能够匹配任意多个单词(可以为 0 个)

Q1 通过 *.a.* 与交换机进行绑定,Q2 通过 *.*.b 和 c.# 与交换机进行绑定

当 P (生产者)在发送消息时,若设置 Routing Key 设置为 work.a.b,则此时 Q1 和 Q2 的 BindingKey 都能够与其相匹配,消息就会被路由到 Q1 和 Q2 中

而当 P 发送消息时,设置 Routing Key 设置为 a.a.a,此时,只有 Q1 的 BindingKey 与其相匹配,消息也就只会被路由到 Q1 中

适用场景:需要灵活匹配和过滤消息的场景

RPC(RPC通信)

RPC 通信过程中,没有生产者和消费者,而是通过两个队列实现了一个可回调的过程

例如:

客户端(Client)发送请求消息到指定队列(rpc_queue),并在消息属性中设置 reply_to 字段,这个字段指定了一个回调队列(amq.gen-Xa2...),这个回调队列用于接收服务端的响应消息

服务器(Server)从队列 rpc_queue 中取出请求消息,处理请求后,将响应消息发送到 reply_to 指定的回调队列(amq.gen-Xa2...)

客户端(Client)在回调队列上等待响应消息,一旦接收到响应,客户端就会检查消息的 correlation_id 属性,确保其是所期望的响应

简而言之,客户端将请求消息发送到 队列Q1 中,服务器从 Q1 中取出请求消息进行处理,然后将响应消息发送到 队列Q2 中,客户端从 Q2 中读取响应消息

从而实现了 客户端向服务器发送请求,服务器返回对应的响应 的功能

Publisher Confirms(发布确认)

Publisher Confirms 模式是 RabbitMQ 提供的一种确保消息可靠发送到 RabbitMQ 服务器的机制,在这种模式下,生产者可以等待 RabbitMQ 服务器的确认,以确保消息已经被服务器接收并处理

其过程为:

(1)生产者将 Channel 设置为 confirm 模式(通过调用 channel.confirmSelect() 完成),发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些 序列号与消息关联 起来,以便追踪消息的状态

(2)当消息被 RabbitMQ 服务器接收并处理后,服务器会异步地向生产者发送一个确认(ACK),其中包含消息的唯一 ID,表示消息已经送达

通过 Publisher Confirms 模式,生产者可以确保消息被 RabbitMQ 服务器成功接收,从而避免消息丢失

适用场景:对数据安全性要求较高,如金融交易,订单处理等

在基本了解了 RabbitMQ 的 7 种工作模式后,我们就来通过代码简单实现一下这 7 种工作模式

代码实现

Simple(简单模式)

简单模式下,只有一个生产者和一个消费者,生产者生产的消息存储到队列后,都由这个消费者消费

在  RabbitMQ入门-CSDN博客 中的入门代码的工作模式就是简单模式,因此,在这里就不再进行过多解释了

首先引入依赖:

        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>

生产者代码

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost("49.232.238.62"); // ip 的默认值为 localhostfactory.setPort(5672); // 默认值为 5672factory.setVirtualHost("test01"); // 虚拟主机,默认值为 /// 账号factory.setUsername("admin"); // 用户名,默认为 guestfactory.setPassword("123456"); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare("simple.test", true, false, false, null);// 6. 通过 channel 发送消息到队列中String message = "test...";channel.basicPublish("", "simple.test", null, message.getBytes());System.out.println("消息:" + message + " 发送成功");// 7. 释放资源channel.close();connection.close();}
}

消费者代码

public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost("49.232.238.62"); // ip 的默认值为 localhostfactory.setPort(5672); // 默认值为 5672factory.setVirtualHost("test01"); // 虚拟主机,默认值为 /// 账号factory.setUsername("admin"); // 用户名,默认为 guestfactory.setPassword("123456"); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare("simple.test", true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume("simple.test", true, consumer);// 7. 释放资源channel.close();connection.close();}
}

Work Queues(工作队列)

生产者代码

工作队列模式下,由一个生产者生产消息,多个消费者共同接收消息,消费者之间是竞争关系,每个消息只能被一个消费者接收

由于我们每次连接时都要使用 IP、端口号、虚拟主机名等,因此,我们可以将它们提取出来,放到 Constants 类中:

public class Constants {public static final String HOST = "49.232.238.62";public static final int PORT = 5672;public static final String VIRTUAL_HOST = "test01";public static final String USER_NAME = "admin";public static final String USER_PASSWORD = "123456";
}

声明 工作队列 模式下使用的队列:

    // 工作模式public static final String WORK_QUEUE = "work.queue";

接下来,我们就来实现生产者的代码:

工作队列模式与简单模式的区别在于工作模式下有多个消费者,因此生产者的消费代码与简单模式下差别不大,但在发送消息时,我们一次发送 20 条消息:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 6. 通过 channel 发送消息到队列中for (int i = 0; i < 20; i++) {String message = "work test... " + i;channel.basicPublish("", Constants.WORK_QUEUE, null, message.getBytes());}System.out.println("消息发送成功!");// 7. 释放资源channel.close();connection.close();}
}

运行代码,可以看到 work.queue 队列被创建,且存储了 20 条消息

 接下来,我们继续编写消费者代码

消费者代码

消费者的代码也与简单模式下的代码差别不大,但在最后,我们并不进行资源的释放:

Consumer1:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);}
}

Consumer2:

public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);}
}

启动 Consumer1 和 Consumer2:

由于我们之前先启动了生产者,此时再启动消费者,由于消息较少,因此,先启动的 Consumer1 会瞬间将 20 条消息消费掉

因此,再次启动 Producer,观察结果:

可以看到两个消费者分别消费了 10 条消息

Publish/Subscribe(发布/订阅)

生产者代码

发布/订阅 模式中,多了 Exchange 角色

Exchange 常见有三种类型,分别代表不同的路由规则:

Fanout:广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)

Direct:定向,将消息交给符合指定 RoutingKey 的队列(Routing 模式)

Topics:通配符,将消息交给符合 Routing Pattern(路由模式)的队列(Topics 模式)

此时,在 发布/订阅 模式下,我们就需要 声明交换机,并绑定队列和交换机

我们首先来看声明交换机

使用 channel.exchangeDeclare 方法来创建交换机,我们来看 exchangeDeclare 方法:

Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;

参数: 

exchange:交换机名称

type:交换机类型

durable:是否持久化,当为 true 时,会将交换机存盘,在服务器重启时不会丢失相关信息

autoDelete:是否自动删除,自动删除的前提是至少有一个队列或交换机与这个交换机绑定,之后与这个交换机绑定的队列或交换机都会与此解绑

internal:是否为内部使用,若设置为 true,则表示内部使用,客户端无法直接发送消息到这个交换机中,只能通过交换机路由到交换机这种方式

arguments:相关参数

其中,type 表示交换机类型,其类型为 BuiltinExchangeType,也可以为 String

我们来看  BuiltinExchangeType,它是一个枚举类型

DIRECT("direct"):定向,直连,将消息交给符合指定 RoutingKey 的队列(Routing 模式)

FANOUT("fanout"):扇形,广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)

TOPIC("topic"):通配符,将消息交给符合 Routing Pattern(路由模式)的队列(Topics 模式)

HEADERS("headers"):参数模式(较少使用)

返回值:

Exchange.DeclareOk:声明确认方法,用于指示已成功声明交换

 在 Constants 类中定义 发布/订阅 模式下使用的交换机和两个队列:

    // 广播模式public static final String PUBLISH_CHANGE = "fanout";public static final String PUBLISH_QUEUE_1 = "publish.queue.1";public static final String PUBLISH_QUEUE_2 = "publish.queue.2";

建立连接,并声明交换机和两个队列:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.PUBLISH_CHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);}
}

交换机的类型为 BuiltinExchangeType.FANOUT 广播模式

接着,我们使用 channel.queueBind 方法将队列和交换机进行绑定:

Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

queue:要绑定的队列名称

exchange:要绑定的交换机名称

routingKey:路由 key,路由规则

arguments:相关参数

在这里的 routingKey,其实就是 BindingKey将交换机与队列关联起来,从而让 RabbitMQ 知道如何正确地将消息路由到队列

在发布/订阅模式下,交换机类型为 fanoutroutingKey 设置为 "",表示每个消费者都可以收到全部信息

        // 7. 绑定交换机和队列channel.queueBind(Constants.PUBLISH_QUEUE_1, Constants.PUBLISH_CHANGE, "", null);channel.queueBind(Constants.PUBLISH_QUEUE_2, Constants.PUBLISH_CHANGE, "", null);

接下来,就可以发送消息了:

        // 8. 发送消息for (int i = 0; i < 20; i++) {String message = "work test... " + i;channel.basicPublish(Constants.PUBLISH_CHANGE, "", null, message.getBytes());}System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();

完整代码:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.PUBLISH_CHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);// 7. 绑定交换机和队列channel.queueBind(Constants.PUBLISH_QUEUE_1, Constants.PUBLISH_CHANGE, "", null);channel.queueBind(Constants.PUBLISH_QUEUE_2, Constants.PUBLISH_CHANGE, "", null);// 8. 发送消息for (int i = 0; i < 20; i++) {String message = "work test... " + i;channel.basicPublish(Constants.PUBLISH_CHANGE, "", null, message.getBytes());}System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();}
}

运行代码,并观察结果:

可以看到,publish.queue.1 和 publish.queue.2 中都已经存储了 20 条消息

查看 fanout 的绑定关系:

成功绑定 publish.queue.1 和 publish.queue.2:

接下来,我们继续编写消费者代码

消费者代码

交换机和队列的绑定关系已经在生产者中实现了,因此,消费者代码中可以不必再写

其实现与 工作队列模式 下是基本相同的,只需要修改读取的队列即可

Consumer1:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.PUBLISH_QUEUE_1, true, consumer);}
}

Consumer2:

public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.PUBLISH_QUEUE_2, true, consumer);}
}

运行 Consumer1 和 Consumer2:

Consumer1 和 Consumer2 都接收到了这 20 条消息

Routing(路由模式)

生产者代码

Routing 模式下,队列与交换机之间的绑定,不再是任意的绑定了,而是需要指定一个 BindingKey

生产者在向 交换机 发送消息时,也需要指定消息的 RoutingKey

交换机不会将消息发送给每一个绑定的 key,而是会根据消息的 RoutingKey 进行判断,只有队列绑定时的 BindingKey 和发送消息的 RoutingKey 完全一致时,才会接收消息

先在 Constants 类中定义 路由模式下使用的交换机和队列:

    // 路由模式public static final String ROUTING_CHANGE = "routing";public static final String ROUTINT_QUEUE_1 = "routing.queue.1";public static final String ROUTINT_QUEUE_2 = "routing.queue.2";

路由模式下,生产者的代码与 发布/订阅模式 下的区别在于:交换机的类型不同 以及 绑定队列的 BindKey 不同

(1)交换机类型不同

在声明交换机时,交换机的类型为 BuiltinExchangeType.DIRECT

        // 5. 声明交换机channel.exchangeDeclare(Constants.ROUTING_CHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);

(2)声明队列

        // 6. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);

(3)交换机与队列的绑定方式不同

        // 7. 绑定交换机和队列channel.queueBind(Constants.ROUTINT_QUEUE_1, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "b", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "c", null);

此时,我们就可以发送消息了

在发送消息时,需要指定 RoutingKey

        // 8. 发送消息String messageA = "test a...";channel.basicPublish(Constants.ROUTING_CHANGE, "a", null, messageA.getBytes());String messageB = "test b...";channel.basicPublish(Constants.ROUTING_CHANGE, "b", null, messageB.getBytes());String messageC = "test c... ";channel.basicPublish(Constants.ROUTING_CHANGE, "c", null, messageB.getBytes());System.out.println("消息发送成功!");

完整代码:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.ROUTING_CHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);// 7. 绑定交换机和队列channel.queueBind(Constants.ROUTINT_QUEUE_1, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "b", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "c", null);// 8. 发送消息String messageA = "test a...";channel.basicPublish(Constants.ROUTING_CHANGE, "a", null, messageA.getBytes());String messageB = "test b...";channel.basicPublish(Constants.ROUTING_CHANGE, "b", null, messageB.getBytes());String messageC = "test c... ";channel.basicPublish(Constants.ROUTING_CHANGE, "c", null, messageB.getBytes());System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();}
}

运行代码,并观察结果:

 routing.queue.1 中有 1 条消息,routing.queue.2 中有两条消息

查看 routing 交换机与 队列的绑定关系:

接下来,我们继续编写消费者的代码 

消费者代码

消费者代码与 发布/订阅 模式下基本相同,只需要修改队列名称即可:

Consumer1:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.ROUTINT_QUEUE_1, true, consumer);}
}

Consumer2:

public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer2 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.ROUTINT_QUEUE_2, true, consumer);}
}

运行结果:

Topics(通配符模式)

生产者代码

相比于 routing 模式,topics 类型的交换机在匹配规则上进行了扩展,BindingKey 支持通配符匹配

其中,RoutingKey 是一系列由 . 分割的单词,如 user.name、work.abc等

BindingKey 也和 RoutingKey 一样,由 . 分割的字符串

在 BindingKey 中可以存在两种特殊的字符串,用于模糊匹配:

* :表示能够匹配任意一个单词

#:表示能够匹配任意多个单词(可以为 0 个)

例如:

交换机 与 队列1(Q1)的 BindingKey 为 *.a.*

交换机 与 队列2(Q2)的 BindingKey 为 *.*.b

交换机 与 队列2(Q2)的 BindingKey 为 c.#

则:

若生产者的 RoutingKey 为 work.a.b,则消息会被路由到 Q1 和 Q2

若生产者的 RoutingKey 为 a.a.a,则消息会被路由到 Q1

若生产者的 RoutingKey 为 c.work.a,则消息会被路由到 Q2

若生产者的 RoutingKey 为 b.c.g,则消息会被丢弃,或是返回给生产者(需要设置 mandatory 参数)

接下来,我们就来实现 通配符模式下 的生产者:

先在 Constants 类中定义通配符模式下使用的交换机和队列:

    // 通配符模式public static final String TOPICS_CHANGE = "topics";public static final String TOPICS_QUEUE_1 = "topics.queue.1";public static final String TOPICS_QUEUE_2 = "topics.queue.2";

与 路由模式相比,发布订阅模式与其区别为:交换机类型不同 以及 绑定队列的 RoutingKey 不同

(1)交换机类型不同

交换机的类型为 BuiltinExchangeType.TOPIC

        // 5. 声明交换机channel.exchangeDeclare(Constants.TOPICS_CHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);


(2)声明队列

        // 6. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);

(3)交换机与队列的绑定方式不同

        // 7. 绑定交换机和队列channel.queueBind(Constants.TOPICS_QUEUE_1, Constants.TOPICS_CHANGE, "*.a.*", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "*.*.b", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "c.#", null);

此时,我们就可以发送消息了

在发送消息时,需要指定 RoutingKey

        // 8. 发送消息String message1 = "test work.a.b";channel.basicPublish(Constants.TOPICS_CHANGE, "work.a.b", null, message1.getBytes());String message2 = "test a.a.a";channel.basicPublish(Constants.TOPICS_CHANGE, "a.a.a", null, message2.getBytes());String message3 = "test c.work.a";channel.basicPublish(Constants.TOPICS_CHANGE, "c.work.a", null, message3.getBytes());System.out.println("消息发送成功!");

完整代码:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.TOPICS_CHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);// 7. 绑定交换机和队列channel.queueBind(Constants.TOPICS_QUEUE_1, Constants.TOPICS_CHANGE, "*.a.*", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "*.*.b", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "c.#", null);// 8. 发送消息String message1 = "test work.a.b";channel.basicPublish(Constants.TOPICS_CHANGE, "work.a.b", null, message1.getBytes());String message2 = "test a.a.a";channel.basicPublish(Constants.TOPICS_CHANGE, "a.a.a", null, message2.getBytes());String message3 = "test c.work.a";channel.basicPublish(Constants.TOPICS_CHANGE, "c.work.a", null, message3.getBytes());System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();}
}

 运行并观察结果:

topics.queue.1 和 topics.queue.2 中都已经存储了两条消息

我们来看 topics.queue.1 中的消息:

我们继续实现消费者代码

消费者代码

Topics 模式的消费者代码与 Routing 模式下相同,只需要修改消费的队列名称即可:

Consumer1:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPICS_QUEUE_1, true, consumer);}
}

Consumer2:

public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPICS_QUEUE_2, true, consumer);}
}

运行 Consumer1 和 Consumer2,并观察结果:

RPC(RPC通信)

RPC(Remote Procedure Call),远程过程调用,是一种通过网络从远程计算机上请求服务,不需要了解底层网络的技术,类似于 Http 远程调用

RabbitMQ 实现 RPC 通信,是通过两个队列实现一个可回调的过程:

其过程为:

客户端(Client)发送请求消息到指定队列(rpc_queue),并在消息属性中设置 reply_to 字段,这个字段指定了一个回调队列(amq.gen-Xa2...),这个回调队列用于接收服务端的响应消息

服务器(Server)从队列 rpc_queue 中取出请求消息,处理请求后,将响应消息发送到 reply_to 指定的回调队列(amq.gen-Xa2...)

客户端(Client)在回调队列上等待响应消息,一旦接收到响应,客户端就会检查消息的 correlation_id 属性,确保其是否是所期望的响应

接下来,我们就来实现 RPC 的客户端:

客户端代码

客户端主要实现的功能有:

1. 发送请求消息到队列中

2. 从回调队列中读取响应消息

我们先来看发送请求消息到队列的过程: 

(1)声明两个队列:消息发送到的队列(Queue) 和 回调队列(replayQueue),并声明本次请求的唯一标志 corrId

(2)将 replayQueue 和 corrId 配置到  Queue 中

接下来,需要从回调队列中读取响应消息,若我们直接从回调队列中读取响应消息,此时,可能服务端还没有处理完请求,也就未将响应消息发送到回调队列中,就读取不到响应

因此,我们可以使用阻塞队列来监听回调队列中的消息

1. 使用阻塞队列阻塞当前进程,监听回调队列中的消息,回调队列中有消息时,将响应消息放到阻塞队列中

2. 阻塞队列中有消息后,主线程被唤醒,处理返回内容

先在 Constants 类中声明 RPC 模式下使用的两个队列:

    // RPC 模式public static final String RPC_QUEUE_1 = "rpc.queue1";public static final String RPC_QUEUE_2 = "rpc.queue2";

在这里,我们就不再声明交换机了,直接使用默认的交换机

声明 消息发送的队列 和 回调队列:

        // 声明队列channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);

使用 UUID 生成本次请求的唯一标志,并配置消息属性:

        // 本次请求的唯一标识String corrId = UUID.randomUUID().toString();

消息相关配置的类型为 BasicProperties,位于 com.rabbitmq.client.AMQP 下:

 AMQP.BasicProperties 提供了一个构造器,可以通过 builder() 来设置一些属性:

使用 correlationId 方法设置唯一标识,replyTo 方法设置回调队列:

        // 本次请求的唯一标识String corrId = UUID.randomUUID().toString();// 消息属性AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(corrId) // 唯一ID.replyTo(Constants.RPC_QUEUE_2) // 回调队列.build();

最后调用 build 方法创建实例 

使用内置交换机发送消息:

        // 7. 发送消息String message = "test rpc...";channel.basicPublish("", Constants.RPC_QUEUE_1, basicProperties, message.getBytes());

接着,使用阻塞队列存储回调结果:

        // 阻塞队列,存放回调结果,一次获取一条消息BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);

 从回调队列中接收响应消息:

        // 8. 接收服务器的响应DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回调消息: " + new String(body));// 判断标识是否正确if(corrId.equals(properties.getCorrelationId())) {queue.offer(new String(body, "UTF-8"));}}};channel.basicConsume(Constants.RPC_QUEUE_2, true, consumer);

 最后,从阻塞队列中获取响应消息:

        // 9. 获取响应消息String result = queue.take();System.out.println("result: " + result);

完整代码:

public class Client {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 使用默认的交换机// 5. 声明队列channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);// 6. 设置消息属性// 本次请求的唯一标识String corrId = UUID.randomUUID().toString();// 消息属性AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(corrId) // 唯一ID.replyTo(Constants.RPC_QUEUE_2) // 回调队列.build();// 7. 发送消息String message = "test rpc...";channel.basicPublish("", Constants.RPC_QUEUE_1, basicProperties, message.getBytes());// 阻塞队列,存放回调结果,一次获取一条消息BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);// 8. 接收服务器的响应DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回调消息: " + new String(body));// 判断标识是否正确if(corrId.equals(properties.getCorrelationId())) {queue.offer(new String(body, "UTF-8"));}}};channel.basicConsume(Constants.RPC_QUEUE_2, true, consumer);// 9. 获取响应消息String result = queue.take();System.out.println("result: " + result);}
}

我们继续编写服务端代码

服务端代码

服务端要实现的功能为:

1. 从队列中接收请求消息

2. 根据消息内容处理请求消息,并将响应消息返回到回调队列中

我们先来实现接收消息:

建立连接、声明队列等过程都与 客户端代码相同

但需要注意的是,我们需要设置服务端同时最多只能获取一条消息

        // 6. 设置同时最多只能获取一条消息channel.basicQos(1);

若不设置 basicQos,RabbitMQ 会使用默认的 Qos 设置,其 prefetchCount 默认值为 0,当 prefetchCount 为 0 时,RabbitMQ 会根据内部实现和当前网络状况等因素,可能同时发送多条消息给消费者。这也就意味着,在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有波动

而在 RPC 模式下,通常希望是一对一的消息处理,即,一个请求对应一个响应。服务端在处理完一个消息并确认后,才会接收到下一条消息

接收消息后,就可以对请求消息进行处理并返回响应结果了:

        // 7. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 将消息发到队列2中AMQP.BasicProperties replayProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();// 返回String message = new String(body);System.out.println("接收到消息: " + message);// 响应消息String response = "request: " + message + " 接收成功";channel.basicPublish("", properties.getReplyTo(), replayProperties, response.getBytes());// 对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_QUEUE_1, false, consumer);

需要注意的是,在这里我们需要手动对消息进行应答,而不是自动确认: 

在 RabbitMQ 中,basicConsume 方法的 autoAck 参数用于指定消费者是否会自动向消息队列确认消息

当设置为 true 时,消息队列会在将消息发送给消费者后,认为消息已经被成功消费,立即删除该条消息,这也就意味着,若消费者处理消息失败,消息就会丢失

当设置为 false 时,消息队列在将消息发送给消费者后,需要消费者显示地调用 basicAck 方式来确认消息,手动确认提供了更高的可靠性,保证消息不会被意外丢失,适用于消息处理重要且需要确保每个消息被正确处理的场景

完整代码:

public class Service {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 使用默认的交换机// 5. 声明队列channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);// 6. 设置同时最多只能获取一条消息channel.basicQos(1);// 7. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 将消息发到队列2中AMQP.BasicProperties replayProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();// 返回String message = new String(body);System.out.println("接收到消息: " + message);// 响应消息String response = "request: " + message + " 接收成功";channel.basicPublish("", properties.getReplyTo(), replayProperties, response.getBytes());// 对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_QUEUE_1, false, consumer);}
}

运行代码,观察结果:

Publisher Confirms(发布确认)

消息中间件,都会面临消息丢失的问题

消息丢失大概分为三种情况:

1. 生产者的问题:由于应用程序故障、网络抖动等各种原因,生产者没有成功向 broker 发送消息

2. 消息中间件的问题:生产者成功将消息发送给了 broker,但 broker 未能将消息保存好,导致消息丢失

3. 消费者的问题:broker 将消息发送给了消费者,消费者在消费消息时,未处理好,导致 broker 将消费失败的消息从队列中删除了

Rabbit 针对上述问题给出了相应的解决方案:

针对问题1,可以采用发布确认(Publisher Confirms)机制实现

针对问题2,可以通过持久化机制

针对问题3,可以采用消息应答机制

接下来,我们就来进一步学习 发布确认机制

发布确认的过程:

(1)生产者将 Channel 设置为 confirm 模式(通过调用 channel.confirmSelect() 完成),发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,以便追踪消息的状态

(2)当消息被 RabbitMQ 服务器接收并处理后,服务器会向生产者发送一个确认(ACK),其中包含消息的唯一 ID,表示消息已经送达

其中,deliveryTag 包含了确认消息的序号,此外,broker 也可以设置 channel.basicAck 方法中的 multiple 参数,表示这个序号之前的所有消息都已经被处理

发送确认机制最大的好处在于它是 异步 的,生产者可以同时发布消息和等待信道返回确认消息

当消息最终得到确认之后,生产者可以通过 回调方法 来处理该确认消息

若 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该 nack 命令

使用发布确认机制,需要将信道设置为 confirm(确认)模式:

            // 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();

 发布模式有 3 种确认策略,我们分别来进行学习

由于使用每种策略时都需要建立连接,因此,我们将建立连接抽取出来:

    public static Connection createConnection() throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理return connection;}

Publishing Messages Individually(单独确认)

单独确认模式下,每发送一条消息,RabbitMQ 会在消息被成功持久化到队列或者被消费者成功接收后,发回一个确认(acknowledgment)。生产者可以收到关于每条消息的确认信息

也就是说,生产者发送消息后会等待每条消息的确认信号。如果消息发送成功,RabbitMQ 会返回一个确认信号;如果消息失败,RabbitMQ 会返回一个负确认信号(nack)

我们先在 Constans 类中声明会使用的队列:

    // 发布确认模式public static final String PUBLISH_CONFIRMS_QUEUE_1 = "publish.confims.queue1";public static final String PUBLISH_CONFIRMS_QUEUE_2 = "publish.confims.queue2";public static final String PUBLISH_CONFIRMS_QUEUE_3 = "publish.confims.queue3";

我们仍使用默认的交换机进行路由 

每次都发送 200 条消息:

public class Producer {public static int MESSAGE_COUNT = 200;
}

 每当发送一条消息,就使用 channel.waitForConfirms() 方法等待确认消息

void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

等待确认消息,当消息被确认,方法就会返回,若消息超时,就会抛出 TimeoutException 异常,若消息丢失,就会抛出 IOException

此外,我们记录 单独确认策略 发送消息的耗时:

public class Producer {public static int MESSAGE_COUNT = 200;public static int WAIT_TIME = 5000;public static void publishMessageIndividually() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());// 等待确认channel.waitForConfirms(WAIT_TIME);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages individually in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}
}

 完整代码:

public class Producer {public static int MESSAGE_COUNT = 200;public static int WAIT_TIME = 5000;// 建立连接public static Connection createConnection() throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理return connection;}// 单独确认模式public static void publishMessageIndividually() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());// 等待确认channel.waitForConfirmsOrDie(WAIT_TIME);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages individually in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}public static void main(String[] args) {// 单独确认模式publishMessageIndividually();}
}

运行结果:

 

可以看到,发送 200 条消息的耗时较长

且,单独确认策略是每发送一条消息后,就调用 channel.waitForConfirmsOrDie 方法,等待服务端的确认,也就是一种串行同步等待的方式,尤其是对于持久化的消息而言,需要等待消息确认存储在磁盘之后才会返回

但发布确认机制支持异步确认,即,可以一边发送消息,一边等待消息确认

我们接着看另外两种策略

Publishing Messages in Batches(批量确认)

批量确认会在每发送一批消息后,调用 waitForConfirms 方法,等待服务器的确认返回

我们每发送 50 条消息,就调用 waitForConfirms  方法进行确认:

    public static int BATCH_SIZE = 50;// 批量确认模式public static void publishMessageInBatches() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置为 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_2, true, false, false, null);// 发送消息int messageCount = 0;long startTime = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_2, null, message.getBytes());messageCount++;// 批量确认if(messageCount == BATCH_SIZE) {channel.waitForConfirms(WAIT_TIME);messageCount = 0;}}// 消息发送完,若还有未确认消息,则进行最后的确认if (messageCount > 0) {channel.waitForConfirms(WAIT_TIME);}long endTime = System.currentTimeMillis();System.out.printf("publish %d messages in batch in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}

需要注意的是,若我们发送的消息为 210 条,此时最后的十条消息未被确认,因此,我们在消息发送完成后,进行最后的确认

调用  publishMessageInBatches 方法,并观察结果:

我们可以看到,相比于单独确认策略,批量确认极大的提高了 confirm 的效率,但当出现了 Basic.Nack 或超时时,我们无法确定是哪一条消息出现了问题,客户端需要将这一批消息都进行重发,这也就重复发送了很多消息,当消息经常丢失时,批量确认的性能会不升反降

最后,我们来看 异步确认

Handling Publisher Confirms Asynchronously(异步确认)

异步确认提供了一个回调方法,服务端确认了一条或多条消息后,客户端会调用这个方法进行处理

Channel 接口提供了 addConfirmListener 方法,可以添加 ConfirmListener 回调接口

 

 ConfirmListener 接口中包含两个重要方法:

handleAck handleNack,分别对应处理 RabbitMQ 发送给生产者的 ack 和 nack

deliveryTag 表示发送消息的序号

multiple 表示是否批量确认,开启批量确认后,若 RabbitMQ 返回的消息序号为 20,则表明 20 条消息都已经接收成功;当不开启批量确认时,若 RabbitMQ 返回的消息序号为 20 ,则表明 20 条消息被成功接收

在使用异步确认策略时,我们需要为每个 Channel 维护一个已发送消息的序号集合,当收到 RabbitMQ 的 confirm 回调时,从集合中删除掉对应消息

Channel 开启 confirm 模式后,channel 上发送消息都会附带一个从 1 开始递增的 deliveryTag 序号。我们可以使用 SortedSet 的有序性来维护这个已发送消息的集合

实现步骤:

1. 使用有序集合存储未确认的消息序号

            // 使用有序集合来存储未确认的消息SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());

2. 当收到 ack 时,从集合中删除消息序号,若为批量确认,则删除小于等于当前消息序号的所有序号

            // 进行确认channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}});

3. 当接收到 nack 时,需要根据具体情况进行消息重发等操作

在这里,我们就不对其进行处理了,直接将消息清除:

                @Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 处理失败,消息重发...// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}

接着,我们发送消息,每当发送一条消息,就将其序号存储到有序集合中:

            // 发送消息long startTime = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;// 获取序号long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());// 存储序号confirmSet.add(nextPublishSeqNo);}

当有序集合为空时,消息确认完,因此,我们使用 while 循环等待消息确认完毕:

            // 消息确认完毕while (!confirmSet.isEmpty()) {Thread.sleep(10);}

若循环体中什么也不写,while 循环执行的速度会非常快,因此,每当判断一次,我们让其等待 10 ms

完整代码:

    // 异步确认模式public static void handlePublishConfirmAsynchronously() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_3, false, false, true, null);// 使用有序集合来存储未确认的消息SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());// 进行确认channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 处理失败,消息重发...// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}});// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;// 获取序号long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());// 存储序号confirmSet.add(nextPublishSeqNo);}// 消息确认完毕while (!confirmSet.isEmpty()) {Thread.sleep(10);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages and handled confirms asynchronously in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}

运行结果:

可以看到,三种策略中,异步确认的表现更好

完整代码:

public class Producer {public static int MESSAGE_COUNT = 200;public static int WAIT_TIME = 5000;public static int BATCH_SIZE = 50;// 建立连接public static Connection createConnection() throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理return connection;}// 单独确认模式public static void publishMessageIndividually() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());// 等待确认channel.waitForConfirmsOrDie(WAIT_TIME);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages individually in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}// 批量确认模式public static void publishMessageInBatches() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置为 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_2, true, false, false, null);// 发送消息int messageCount = 0;long startTime = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_2, null, message.getBytes());messageCount++;// 批量确认if(messageCount == BATCH_SIZE) {channel.waitForConfirms(WAIT_TIME);messageCount = 0;}}// 消息发送完,若还有未确认消息,则进行最后的确认if (messageCount > 0) {channel.waitForConfirms(WAIT_TIME);}long endTime = System.currentTimeMillis();System.out.printf("publish %d messages in batch in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}// 异步确认模式public static void handlePublishConfirmAsynchronously() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_3, false, false, true, null);// 使用有序集合来存储未确认的消息SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());// 进行确认channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 处理失败,消息重发...// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}});// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;// 获取序号long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());// 存储序号confirmSet.add(nextPublishSeqNo);}// 消息确认完毕while (!confirmSet.isEmpty()) {Thread.sleep(10);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages and handled confirms asynchronously in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}public static void main(String[] args) {// 单独确认模式publishMessageIndividually();// 批量确认模式publishMessageInBatches();// 异步确认handlePublishConfirmAsynchronously();}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com