您的位置:首页 > 游戏 > 手游 > 网络设计属于什么专业_seo优化包括哪些内容_刷推广软件_永久免费域名申请

网络设计属于什么专业_seo优化包括哪些内容_刷推广软件_永久免费域名申请

2025/1/5 6:34:54 来源:https://blog.csdn.net/2301_78320637/article/details/144010244  浏览:    关键词:网络设计属于什么专业_seo优化包括哪些内容_刷推广软件_永久免费域名申请
网络设计属于什么专业_seo优化包括哪些内容_刷推广软件_永久免费域名申请

文章目录

  • 七种工作模式介绍
  • 简单模式
    • 基本概念
    • 代码实现
  • 工作队列模式
    • 基本概念
    • 代码实现
  • 发布订阅模式
    • 基本概念
    • 代码实现
  • 路由模式
    • 基本概念
    • 代码实现
  • 通配符模式
    • 基本概念
    • 代码实现
  • `RPC`(远程过程调用模式)
    • 基本概念
    • 代码实现
  • `Publisher Confirms`(发布确认模式)
    • `MQ`是如何保证消息的可靠性的
    • 什么是发布确认机制
      • 发布确认机制的好处
      • 发布确认的三大策略
        • 单独确认策略
          • 代码实现
        • 批量确认策略
          • 代码实现
        • 异步确认策略
          • 代码实现

七种工作模式介绍

  • 简单模式
  • 工作队列
  • 订阅与发布模式
  • 路由模式
  • 主题模式
  • RPC模式
  • 发布确认模式

简单模式

基本概念

在这里插入图片描述

  • 特点:一个生产者(Producer),一个消费者(Consumer),一个队列(Queue),没有交换机。消费者监听该队列。
  • 应用场景:适用于简单的点对点消息传递场景。

代码实现

Producer端实现:

public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.参数设置factory.setHost("123.60.91.50");//ip地址factory.setPort(5162);//默认的提供rabbitmq服务的端口号factory.setUsername("admin");//用户名factory.setPassword("fengadmin");//密码factory.setVirtualHost("fbl");//虚拟机名称System.out.println("成功设置");Connection connection=factory.newConnection();System.out.println("连接成功");//3.创建一个通道Channel channel =connection.createChannel();//4.创建一个队列/* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;* 参数:  1.queue:队列名称2.durable:是否持久化,当mq重启之后,消息还在,相当于存储在硬盘当中3.exclusive:是否独占,只有一个消费者监听队列;当connection关闭时,是否删除队列4.autoDelete:是否自动删除,当没有consumer时,自动删除掉5.arguments:一些其他参数*/channel.queueDeclare("hello",true,false,false,null);//5.通过channel将消息发送到队列中for (int i = 0; i < 10; i++) {String msg="hello rabbitmq--- :"+i;/*void basicPublish(String exchange,String routingKey, BasicProperties props, byte[] body)throws IOException;*参数: 1.exchange:交换机的名称,简单模式下,交换机会使用默认的""2.routingKey:路由名称,在内置交换机下,路由名称=队列名称3.props:配置信息4.body:发送消息的数据 (参数形式时byte[])*/channel.basicPublish("","hello",null,msg.getBytes());System.out.println(msg+" 消息发送成功");}//6.释放资源channel.close();connection.close();//注意先关channel,再关connection,如果顺序反了,会出问题//或者直接关闭connectin即可}
}

Consumer的实现:

//消费者
public class ConsumerDemo1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.参数设置factory.setHost("123.60.91.50");//ip地址factory.setPort(5162);//默认的提供rabbitmq服务的端口号factory.setUsername("admin");//用户名factory.setPassword("fengadmin");//密码factory.setVirtualHost("fbl");//虚拟机名称System.out.println("成功设置");Connection connection=factory.newConnection();System.out.println("连接成功");//3.创建一个通道/*为什么在消费者方也要去创建一个通道?* 因为一般来说生产者和消费者并不在同一个项目中开发的* 如果直接生产者的项目还没有上线,那么就不存在channel这个通道,那么消费者* 去该通道中获取的时候,会出现报错*/Channel channel =connection.createChannel();//4.声明队列channel.queueDeclare("hello",true,false,false,null);//5.接收消息,并进行消费/** String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;参数名称:* 1.queue:队列名称* 2.autoAck:是否自动确认,消费者收到消息之后,手动和MQ进行确认,还是自动进行确认* 3.callback:回调对象* *///回调方法:当收到消息后,会自动执行该方法/*参数: 1.consumerTag:标识2.envelope:获取一些消息,交换机,路由key...3.properties:配置信息4.body:数据* */DefaultConsumer consumer=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: "+new String(body));}};channel.basicConsume("hello",true,consumer);//try { TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}//6.释放资源,消费者相当于是一个监听程序,不需要关闭资源
//        channel.close();
//        connection.close();}
}

重点API:

  • 声明队列:queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments);

    • queue:队列名称
    • durable:是否持久化,当mq重启之后,消息还在,相当于存储在硬盘当中
  • autoDelete:是否自动删除,当没有consumer时,自动删除掉

  • arguments:一些其他参数

工作队列模式

基本概念

  • 特点:一个生产者(Producer),无交换机,一个队列,多个消费者(Consumer)。多个消费者监听一个队列,对消息的消费是轮询方式接收。
    应用场景:适用于处理消息较多的情况,多个消费者共同处理同一个队列中的消息,提高消息处理速度。

代码实现

发布订阅模式

基本概念

在这里插入图片描述

  • 特点:一个生产者(Producer),一个交换机(Exchange),多个队列,多个消费者(Consumer)。消费者监听队列。交换机只负责将消息绑定到队列中,不会存储消息。发布订阅模式的交换机类型属于Fanout类型。
  • 应用场景:适用于一对多的消息广播场景,如日志记录、即时通知等。

交换机类型:
交换机分为三种,分别代表着不同的消息路由类型:

  1. Fanout:广播,将消息交给所有绑定到交换机的队列,即(Publish/Subcribe模式)
  2. Direct:定向,将消息交给符合指定的routing key的队列(`Routing``模式)
  3. Topic:通配符,将消息交给符合routing pattern(路由模式)的队列(Topics模式)

Routing KeyBinding Key之间的区别:
Routing KeyBinding Key之间是不存在明显的区分的,在方法中参数的名称也经常混用,我们可以从意义上来区分:
Binding Key是用来将交换机和队列之间进行绑定的,Routing Key是生产者和交换机之间进行路由的标记

代码实现

//生产者:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.参数设置factory.setHost(Constants.IP);//ip地址factory.setPort(Constants.PORT);//默认的提供rabbitmq服务的端口号factory.setUsername(Constants.USER_NAME);//用户名factory.setPassword(Constants.USER_PASSWORD);//密码factory.setVirtualHost(Constants.VHOST);//虚拟机名称System.out.println("成功设置");Connection connection=factory.newConnection();System.out.println("连接成功");//3.创建一个通道Channel channel =connection.createChannel();//4.声明一个交换机/*参数:* 1.交换机的名称* 2.交换机的类型(4种类型)* 3.是否持久化* 4.是否自动删除* 5.是否是内置的交换机(如果是内置交换机,就无法从客户端处指定交换机)* 6.一些参数*/channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);//5.声明两个队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//6.声明交换机和队列之间的绑定关系/*queueBind(String exchange, String queue, String routingKey) throws IOException;* 参数:* 1.exchange:交换机名称* 2.queue:队列名称* 3.routingKey:相当于BingKey* */channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");System.out.println("交换机和队列绑定成功");//7.发布消息
//5.通过channel将消息发送到队列中for (int i = 0; i < 10; i++) {String msg="hello fanout--- :"+i;channel.basicPublish(Constants.FANOUT_EXCHANGE,"",null,msg.getBytes());System.out.println(msg+" 消息发送成功");}//8.释放资源channel.close();connection.close();}
}

路由模式

基本概念

在这里插入图片描述

  • 特点:一个生产者,一个交换机,多个队列,多个消费者。生产者发送消息时会发送一个Routing Key给交换机,交换机通过这个Routing Key绑定到对应规则的队列上,实现消息的分发。
  • 应用场景:适用于需要将不同级别的消息数据路由到特定的消息队列中的场景。

代码实现

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.参数设置factory.setHost(Constants.IP);//ip地址factory.setPort(Constants.PORT);//默认的提供rabbitmq服务的端口号factory.setUsername(Constants.USER_NAME);//用户名factory.setPassword(Constants.USER_PASSWORD);//密码factory.setVirtualHost(Constants.VHOST);//虚拟机名称System.out.println("成功设置");Connection connection=factory.newConnection();System.out.println("连接成功");//3.创建一个通道Channel channel =connection.createChannel();//4.声明一个交换机/*参数:* 1.交换机的名称* 2.交换机的类型(4种类型)* 3.是否持久化* 4.是否自动删除* 5.是否是内置的交换机(如果是内置交换机,就无法从客户端处指定交换机)* 6.一些参数*/channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true);//5.声明三个队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//6.声明交换机和队列之间的绑定关系/*queueBind(String exchange, String queue, String routingKey) throws IOException;* 参数:* 1.exchange:交换机名称* 2.queue:队列名称* 3.routingKey:相当于BingKey* */channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");System.out.println("交换机和队列绑定成功");//7.发布消息//只发送到routingKey=="a"中for (int i = 0; i < 10; i++) {String msg="hello direct--- :"+i;channel.basicPublish(Constants.DIRECT_EXCHANGE,"a",null,msg.getBytes());System.out.println(msg+" 消息发送成功");}for (int i = 0; i < 20; i++) {String msg="hello direct--- :"+i;channel.basicPublish(Constants.DIRECT_EXCHANGE,"b",null,msg.getBytes());System.out.println(msg+" 消息发送成功");}//8.释放资源channel.close();connection.close();}
}

重要API:

  • 声明交换机:channel.exchangeDeclare(String exchange, BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal);
    • exchange:交换机名称
    • type:交换机的类型,一共四种类型(direct,fanout,topic,headers)
    • durable:是否持久化
    • autoDelete:是否自动删除
    • internal:是否是内置交换机.(如果是内置交换机,不能通过客户端直接发送消息到该交换机上)
  • 队列和交换机绑定:queueBind(String exchange, String queue, String routingKey)
    • Exchange:交换机的名称
    • Queue:队列的名称
    • routingKey:路由参数
  • 发送消息到指定的队列中:basicPublish(String exchange, String routingKey, BasicProperties prop, byte[] message)
    • exchange:交换机名称
    • routingKey:路由参数
    • prop:配置参数
    • message:信息

通配符模式

基本概念

  • 特点:一个生产者,一个交换机,多个队列,多个消费者。Topics模式在Routing模式的基础上,给队列绑定带通配符的路由关键字。只要消息的Routing Key能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用Topic交换机。
    *号:代表匹配一个词(只能一个,不能多也不能少)。
    #号:代表匹配一个或多个词(可以没有,或者有很多)。
  • 应用场景:适用于需要对消息数据中的Routing Key进行通配符匹配,将满足条件的消息数据分发到特定的队列中的场景。
    在这里插入图片描述

topic类型的交换机在匹配规则上,有一些要求:

  1. RoutingKey是一系列用.分割的单词,比如a.orange.b,b.rabbit.c
  2. BindingKeyRoutingKey一样,也是点.分割的字符串
  3. BindingKey中存在两种特殊的字符串,用于模糊匹配
    a. *表示任意一个单词
    b. #表示任意多个单词

代码实现

//生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.参数设置factory.setHost(Constants.IP);//ip地址factory.setPort(Constants.PORT);//默认的提供rabbitmq服务的端口号factory.setUsername(Constants.USER_NAME);//用户名factory.setPassword(Constants.USER_PASSWORD);//密码factory.setVirtualHost(Constants.VHOST);//虚拟机名称System.out.println("成功设置");Connection connection=factory.newConnection();System.out.println("连接成功");//3.创建一个通道Channel channel =connection.createChannel();//4.声明一个交换机/*参数:* 1.交换机的名称* 2.交换机的类型(4种类型)* 3.是否持久化* 4.是否自动删除* 5.是否是内置的交换机(如果是内置交换机,就无法从客户端处指定交换机)* 6.一些参数*/channel.exchangeDeclare(Constants.TOPICS_EXCHANGE, BuiltinExchangeType.TOPIC,true);//5.声明三个队列channel.queueDeclare(Constants.TOPICS_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.TOPICS_QUEUE2,true,false,false,null);//6.声明交换机和队列之间的绑定关系/*queueBind(String exchange, String queue, String routingKey) throws IOException;* 参数:* 1.exchange:交换机名称* 2.queue:队列名称* 3.routingKey:相当于BingKey* */channel.queueBind(Constants.TOPICS_QUEUE1,Constants.TOPICS_EXCHANGE,"*.orange.*");channel.queueBind(Constants.TOPICS_QUEUE2,Constants.TOPICS_EXCHANGE,"*.*.rabbit");channel.queueBind(Constants.TOPICS_QUEUE2,Constants.TOPICS_EXCHANGE,"lazy.*");System.out.println("交换机和队列绑定成功");//7.发布消息//只发送到routingKey=="a"中for (int i = 0; i < 10; i++) {String msg="hello direct--- :"+i;channel.basicPublish(Constants.TOPICS_EXCHANGE,"a.orange.b",null,msg.getBytes());System.out.println(msg+" 消息发送成功");}for (int i = 0; i < 20; i++) {String msg="hello direct--- :"+i;channel.basicPublish(Constants.TOPICS_EXCHANGE,"a.b.rabbit",null,msg.getBytes());System.out.println(msg+" 消息发送成功");}//8.释放资源channel.close();connection.close();}
}

RPC(远程过程调用模式)

基本概念

在这里插入图片描述

  • 特点:客户端通过RabbitMQ发送消息到服务端,服务端调用函数对消息进行处理,再将处理结果通过另一消息队列返回给客户端。严格意义来说,这种模式违背了消息队列(MQ)的初衷,它需要等待服务端放回结果。可以视为RabbitMQ的一种扩展,为了实现消息可靠性投递。
  • 应用场景:适用于需要客户端与服务端之间进行双向通信的场景。

rpc通信的工作流程:
Client:

  1. 发送请求Request,且携带reply_tocorrelation_id两个参数
  • 在消息属性中设置replyTo字段,这个字段指定了一个回调队列,服务器处理之后,会将响应的结果发送到这个队列中.
  • 客户端在回调队列上等待消息.一旦接收到响应,客户端就会检查消息的correlation_id属性,以确保是它所期望的响应
  1. 接收响应(校验correlation_id)
    Server:
  2. 接收请求,进行响应
  3. 发送响应(按照客户端指定的replyTo,设置correlation_id)

代码实现

client代码:

{public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.参数设置factory.setHost(Constants.IP);//ip地址factory.setPort(Constants.PORT);//默认的提供rabbitmq服务的端口号factory.setUsername(Constants.USER_NAME);//用户名factory.setPassword(Constants.USER_PASSWORD);//密码factory.setVirtualHost(Constants.VHOST);//虚拟机名称System.out.println("成功设置");Connection connection=factory.newConnection();System.out.println("连接成功");//3.创建一个通道Channel channel =connection.createChannel();//4.声明一个请求队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);channel.queueDeclare(Constants.RPC_REPONSE_QUEUE,true,false,false,null);//唯一标志本次请求String corrId= UUID.randomUUID().toString();//生成附带的参数属性AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().correlationId(corrId)//唯一标志本次请求.replyTo(Constants.RPC_REPONSE_QUEUE)//设置回调队列.build();String msg="hello rpc....";//5.向请求队列发送请求channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,properties,msg.getBytes());//接受响应//阻塞队列,用于存储回调的结果final BlockingDeque<String> reponse=new LinkedBlockingDeque<>(1);//8.接收消息DefaultConsumer consumer=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: "+new String(body));if(properties.getCorrelationId().equals(corrId)){//校验一致reponse.offer(new String(body,"UTF-8"));}}};//9.从响应队列中消费channel.basicConsume(Constants.RPC_REPONSE_QUEUE,true,consumer);String result=reponse.take();System.out.println("[RPC Client接收到响应: ]"+result);//释放资源channel.close();connection.close();}
}

重要的API:

  • 唯一标志本次请求: String corrId= UUID.randomUUID().toString();
  • 生成附带的参数属性:AMQP.BasicProperties properties=new AMQP.BasicProperties().builder() .correlationId(corrId)//唯一标志本次请求 .replyTo(Constants.RPC_REPONSE_QUEUE)//设置回调队列 .build();

server代码:

public class RpcServer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.参数设置factory.setHost(Constants.IP);//ip地址factory.setPort(Constants.PORT);//默认的提供rabbitmq服务的端口号factory.setUsername(Constants.USER_NAME);//用户名factory.setPassword(Constants.USER_PASSWORD);//密码factory.setVirtualHost(Constants.VHOST);//虚拟机名称System.out.println("成功设置");Connection connection=factory.newConnection();System.out.println("连接成功");//3.创建一个通道Channel channel =connection.createChannel();//4.声明一个请求队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);channel.queueDeclare(Constants.RPC_REPONSE_QUEUE,true,false,false,null);//5.接收一个请求channel.basicQos(1);//设置最多只能接受一个消息DefaultConsumer consumer=new DefaultConsumer(channel){//回调机制@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//生成附带的参数AMQP.BasicProperties prop=new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();//生成响应结果String response=new String(body);//发送响应到响应队列中channel.basicPublish("",properties.getReplyTo(),prop,response.getBytes());//对消息进行应答channel.basicAck(envelope.getDeliveryTag(),false);}};//从request队列中接受请求channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);}
}

Publisher Confirms(发布确认模式)

MQ是如何保证消息的可靠性的

在这里插入图片描述
作为消息中间件,都会面临消息丢失的问题,消息丢失的问题大概可以分为三种情况:

  1. 生产者问题:因为应用程序故障,网络抖动等原因,生产者没有向Borker发送消息;
  2. 消息中间件自身的问题:生产者成功将消息发送给了Borker,但是Borker并没有把消息保存好,导致消息丢失.
  3. 消费者问题:Broker发送消息给到消费者,消费者在消费消息时,因为没有处理好,导致Borker将发送失败的消息从队列中删除了.

这里我们可以针对这三种情况,给出三种不同的解决方案:

  • 生产者产生问题:发布确认机制
  • Borker产生问题:持久化机制
  • 消费者产生问题:消息应答机制

现在我们来学习发布确认机制

什么是发布确认机制

在这里插入图片描述

  • 特点:指的是提供者可靠投递到交换机的过程,不会因为网络或者其他问题导致消息丢失。在这个模式下,可以在提供者注册一个回调函数。当消息发送后,不管如何都会触发这个回调函数,可以通过这个回调函数来判断是否到达交换机。
  • 应用场景:适用于需要确保消息可靠投递到交换机的场景。

发布确认机制是RabbitMQ的七大工作模式之一.

  • 生产者将信道设定为confirm(确认)模式
  • 信道进入confirm模式,会将在该信道上发布的消息都指派一个唯一标识ID(从1开始);
  • 一旦消息进入到所匹配的队列时,RabbitMQ就会发送一个确认给生产者(包含消息的唯一ID);
  • 生产者就可以得知消息已经正确到达目的队列中了
  • 如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出
  • borker回传给生产者的确认消息中存在两个属性:deliveryTagmultiple,其中deliveryTag 表示确认消息的序号,multiple表示是否批量处理

发布确认机制的好处

发布确认机制最大的好处在于异步,生产者可以同时发布消息和等待信道返回确认消息

  1. 当消息最终得到确认之后,生产者可以通过回调的方法来处理该确认信息
  2. 如果RabbitMQ因为Borker内部错误导致消息丢失,那么就会向生产者发送一条nack命令,生产者同样可以在回调方法中处理该nack命令(也就是说Borker会给生产者发送的ACK分别为ack或者nack)

发布确认的三大策略

单独确认策略

特点:每发送一条消息,就会等待确认信息,收到确认信息之后,才会接着发送消息

代码实现
//单独确认策略
public static void individually() throws Exception {try (Connection connection = createConnection()) {Channel channel = connection.createChannel();//开启信道的确认模式channel.confirmSelect();//声明队列channel.queueDeclare(Constants.PUBLISHER_QUEUE1, true, false, false, null);//循环发送消息,调用等待确认消息方法long start = System.currentTimeMillis();for (int i = 0; i < 200; i++) {String msg = "消息:" + i;channel.basicPublish("", Constants.PUBLISHER_QUEUE1, null, msg.getBytes());channel.waitForConfirmsOrDie(5_000);//等待确认消息,只要消息被确认,该方法就会返回}long end = System.currentTimeMillis();System.out.format("单独确认策略,用时:%d\n", end - start);}
}

重点API:
channel.waitForConfirmsOrDie(5_000);:等待确认消息,只要消息被确认,该方法就会返回

批量确认策略
代码实现
//批量确认策略
public static void inBatch () throws Exception {try (Connection connection = createConnection()) {Channel channel = connection.createChannel();//开启信道的确认模式channel.confirmSelect();//声明队列channel.queueDeclare(Constants.PUBLISHER_QUEUE2, true, false, false, null);long start = System.currentTimeMillis();int batchSize=100;int curSize=0;for (int i = 0; i < 200; i++) {String msg="消息: "+i;channel.basicPublish("", Constants.PUBLISHER_QUEUE2, null, msg.getBytes());curSize++;if(batchSize==curSize){channel.waitForConfirmsOrDie(5_000);//等待确认消息,只要消息被确认,该方法就会返回curSize=0;}}if(curSize>0){//保证消息确认完毕channel.waitForConfirmsOrDie(5_000);//等待确认消息,只要消息被确认,该方法就会返回curSize--;}long end = System.currentTimeMillis();System.out.format("批量确认策略,用时:%d\n", end - start);}
}

相比于单独确认策略,批量确认大大提升了效率,但其缺点是:如果出现了Nack信息或者超时,我们不清楚是哪条消息出现了问题,客户端这时需要将这一批次的消息全部重发,这样会带来明显的重复消息数量。当消息经常丢失的时候,批量确认的性能不升反降。

异步确认策略
代码实现
//异步确认策略
public static void Asynchronously () throws Exception {try (Connection connection = createConnection()) {Channel channel = connection.createChannel();//开启信道的确认模式channel.confirmSelect();//声明队列channel.queueDeclare(Constants.PUBLISHER_QUEUE3, true, false, false, null);//有序集合,元素按照自然顺序排序,存储为confirm消息序号SortedSet<Long> confirmSet= Collections.synchronizedSortedSet(new TreeSet<>());//生产者收到消息的ack和nack的消息,进行处理:channel.addConfirmListener(new ConfirmListener() {/***这里是生产者收到了ack信息,然后进行消息的处理*/@Overridepublic void handleAck(long l, boolean b) throws IOException {/*l: deliveryTag,每一个信息的ID* b: mutiple,表示是否批量处理*/if(b){//批量处理confirmSet.headSet(l+1).clear();}else{//单独处理confirmSet.remove(l);}}/***这里是生产者收到了nack信息,然后进行消息的处理*/@Overridepublic void handleNack(long l, boolean b) throws IOException {/*l:deliveryTag,每一个信息的ID* b:mutiple,表示是否批量处理*/if(b){//批量处理confirmSet.headSet(l+1).clear();}else{//单独处理confirmSet.remove(l);}//重发信息的逻辑}});//循环发送消息long start = System.currentTimeMillis();for (int i = 0; i < 200; i++) {String msg="消息:"+i;long nextId=channel.getNextPublishSeqNo();//得到下一次发送消息的序号,从1开始channel.basicPublish("",Constants.PUBLISHER_QUEUE2,null,msg.getBytes());//将序号存入集合中confirmSet.add(nextId);}//消息确认完毕while(!confirmSet.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.format("异步确认策略,用时:%d\n", end - start);}
}

重点API:

  • Channel接口提供了addConfirmListener方法,这个方法可以添加ConfirmListener回调接口
  • ConfirmListener该方法实现了对处于Confirm模式的信道的监听,可以分别对应处理RabbitMQ发送给生产者的acknack
  • ConfirmListener接口中包含两个重写方法:handleAck(long deliveryTag,boolean multiple)handleNack(long deliveryTag,boolean multiple)
    其中的两个参数deliveryTag 表示发送消息的序号,multiple 表示是否批量确认

三种策略的对比:
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com