您的位置:首页 > 娱乐 > 八卦 > 网站策划初级方案模板_哪个平台可以随便发广告_网络工程师是干什么的_品牌营销策划方案范文

网站策划初级方案模板_哪个平台可以随便发广告_网络工程师是干什么的_品牌营销策划方案范文

2025/3/19 2:16:25 来源:https://blog.csdn.net/weixin_73144915/article/details/146234214  浏览:    关键词:网站策划初级方案模板_哪个平台可以随便发广告_网络工程师是干什么的_品牌营销策划方案范文
网站策划初级方案模板_哪个平台可以随便发广告_网络工程师是干什么的_品牌营销策划方案范文

一、简介

RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它用于异步通信、解耦系统,提高系统的可扩展性和可靠性。它广泛应用于微服务架构、分布式系统、异步处理等场景。


1.RabbitMQ 的特点

  1. 支持多种协议:支持 AMQP 0.9.1、AMQP 1.0、MQTT、STOMP 等。
  2. 高可用性:支持集群部署、镜像队列等机制,确保消息不丢失。
  3. 可靠性:通过持久化、ACK 机制、事务支持等方式保证消息可靠传输。
  4. 灵活的路由机制:通过交换机(Exchange)和绑定(Binding)策略,实现复杂的消息路由规则。
  5. 插件机制:提供插件扩展,如管理插件、消息追踪插件、WebSocket 支持等。
  6. 轻量级、易于部署:基于 Erlang 语言开发,占用资源较低,支持 Docker、Kubernetes 部署。

2.RabbitMQ 的优缺点

(1)优点:
  1. 支持多种消息模式(点对点、发布订阅、路由等)。
  2. 社区活跃,生态丰富,官方提供大量插件,支持多种语言客户端(Java、Python、Go 等)。
  3. 支持消息持久化,即使服务器重启,消息也不会丢失。
  4. 支持集群模式,可扩展性强,适用于分布式环境。
  5. 性能较好,适用于中小型业务场景。
(2)缺点:
  1. 吞吐量较低:相比 Kafka,RabbitMQ 的吞吐量较低,不适合超大规模数据流处理。
  2. 管理运维复杂:需要手动管理队列、交换机、绑定等,运维成本相对较高。
  3. Erlang 生态较小,部分企业的运维人员对 Erlang 语言不熟悉,可能影响问题排查。
  4. 资源占用较大:在高并发场景下,RabbitMQ 的内存占用较多,可能影响性能。

二、核心组件

1. 生产者(Producer)

  • 负责发送消息到 RabbitMQ,消息最终会进入队列(Queue)。
  • 生产者不会直接把消息发到队列,而是先发送到交换机(Exchange)

2. 交换机(Exchange)

  • 负责接收生产者发送的消息,并根据绑定规则决定消息流向。
  • 交换机类型:
    • Direct(直连交换机):消息按照指定的 routing key 发送到匹配的队列。
    • Fanout(扇形交换机):消息广播到所有绑定的队列,不考虑 routing key
    • Topic(主题交换机):按模式匹配 routing key,适用于模糊匹配的场景(如 logs.*)。
    • Headers(头交换机):根据消息的 Header 属性进行路由。

3. 队列(Queue)

  • 存储消息,等待消费者(Consumer)消费。
  • 可配置是否持久化(避免 RabbitMQ 重启后消息丢失)。
  • 可设置死信队列(DLX),当消息超时未消费或被拒绝时,进入死信队列做后续处理。

4. 绑定(Binding)

  • 连接交换机和队列的桥梁,决定消息如何流转。
  • 绑定通常通过 routing key 进行匹配,或基于 headers 进行匹配。

5. 消费者(Consumer)

  • 从队列中获取消息并进行处理。
  • 支持手动 ACK 机制(保证消息消费后才被确认,避免丢失)。

6. 虚拟主机(Virtual Host,VHost)

  • RabbitMQ 服务器的逻辑隔离机制,不同 VHost 之间的消息互不影响。
  • 每个 VHost 维护自己的队列、交换机和权限控制。

7. RabbitMQ 连接(Connection)和信道(Channel)

  • Connection:TCP 连接,生产者和消费者都需要建立连接。
  • Channel:RabbitMQ 的逻辑连接,多个 Channel 共享一个 Connection,减少 TCP 连接开销。

三、交换机类型

RabbitMQ 中的交换机(Exchange)是消息路由的核心,决定了消息的流向。根据不同的路由策略,RabbitMQ 提供了几种交换机类型:

1. Direct Exchange(直连交换机)

  • 特点:消息通过交换机按 routing key 路由到匹配的队列。
  • 适用场景:精确匹配的消息路由。

在 Direct Exchange 中,生产者发送消息时指定 routing key,只有与队列绑定时的 routing key 完全匹配的队列会接收到消息。


2. Fanout Exchange(扇形交换机)

  • 特点:不关心 routing key,将消息广播到所有绑定的队列。
  • 适用场景:需要将消息发送给多个消费者或多个队列的场景(例如广播通知)。

Fanout Exchange 将消息发送到所有绑定的队列,完全不考虑 routing key


3. Topic Exchange(主题交换机)

  • 特点:使用 routing key 的模式匹配功能,支持通配符(*#)来路由消息。
  • 适用场景:根据模式灵活路由消息,比如日志系统中的级别过滤(如 logs.infologs.error)。

Topic Exchange 根据绑定的 routing key 模式进行匹配,* 匹配一个词,# 匹配多个词。比如 logs.info 匹配所有包含 logs.info 的消息。


4. Headers Exchange(头交换机)

  • 特点:通过匹配消息的 Header 信息进行路由,而不是 routing key
  • 适用场景:需要通过多个属性来过滤消息的情况,比如消息头包含多个属性,灵活的路由机制。

Headers Exchange 根据消息的 header 信息来路由,常用于需要灵活多维度匹配的场景。


总结

  • Direct Exchange:用于精确匹配的场景。
  • Fanout Exchange:用于广播场景,所有绑定的队列都会接收到消息。
  • Topic Exchange:用于模式匹配,灵活路由。
  • Headers Exchange:通过 header 属性进行路由,灵活多维度匹配。

四、创建方式

在 RabbitMQ 中,队列(Queue)、交换机(Exchange)、和绑定(Binding)是消息传递的核心元素。下面我会介绍它们在 Spring Boot 中的创建方式,包括使用 配置类注解方式

1.配置类方式

在配置类中,可以通过 @Bean 注解来声明队列、交换机和绑定关系。

@Configuration
public class RabbitConfig {// 创建队列@Beanpublic Queue queue() {return new Queue("myQueue", true);}// 创建 Direct 交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange("directExchange");}// 创建绑定@Beanpublic Binding binding(Queue queue, DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("myRoutingKey");}
}
  • 队列(Queue):可以使用 @Bean 在配置类中声明。
  • 交换机(Exchange):同样使用 @Bean 创建,通常使用 DirectExchangeFanoutExchangeTopicExchange 等不同类型。
  • 绑定(Binding):队列和交换机的绑定通过 BindingBuilder.bind(queue).to(exchange).with(routingKey) 来配置。

2.注解方式

在 Spring Boot 中,@RabbitListener 注解可以用来监听 RabbitMQ 队列,并且可以通过 @QueueBinding 直接在注解中完成 队列、交换机、绑定 的声明(如果不存在就会创建)。

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "coupon.order.queue1", durable = "true"), // 队列exchange = @Exchange(name = "coupon.order.exchange", type = ExchangeTypes.DIRECT, durable = "true"), // 交换机key = {"coupon"} // 绑定的 routing key
))
public void receiveMessage(String message) {System.out.println("Received: " + message);
}

注意:RabbitMQ 默认发送和接收的消息是 字节数组(byte[]),在 Spring Boot 中,默认是字符串,但如果发送的是对象,需要手动配置 JSON 序列化 才能正确转换。需要在生产者和消费者都添加Bean注解:

@Beanpublic MessageConverter jacksonMessageConvertor() {return new Jackson2JsonMessageConverter();}

五、生产者确认机制

RabbitMQ 生产者确认机制(Publisher Confirms)用于确保消息从生产者 正确投递到交换机,并且 从交换机正确路由到队列,防止消息丢失。


1. 生产者确认机制的三种模式

首先配置.yml文件或.properties文件:

spring:rabbitmq:# 生产者网络连接失败重试设置,不建议使用template:retry:enabled: true        # 开启消费者重试(抛出异常时触发)initial-interval: 400ms  # 首次重试间隔multiplier: 1        # 间隔倍数(1表示固定间隔)max-attempts: 3      # 最大重试次数(含首次消费)# 生产者确认机制(可靠性保障关键配置)publisher-confirm-type: none  # 开启消息确认publisher-returns: correlated     # 开启消息路由失败通知

(1)ConfirmCallback(消息到达交换机,Exchange)

作用: 确保消息 成功到达交换机(Exchange)。

如果消息 到达交换机,返回 ack=true;如果 未到达交换机(如交换机不存在),返回 ack=false 并提供 cause 错误信息。

实现方式:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息成功到达交换机!");} else {System.out.println("消息未到达交换机,原因:" + cause);}
});

(2)ReturnCallback(消息未投递到队列,Queue)

作用: 确保消息 正确路由到队列。

如果消息没有匹配任何队列(比如 routingKey 错误),触发 ReturnCallback

实现方式:

rabbitTemplate.setMandatory(true); // 必须设置为 true,否则不会触发回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("消息未投递到队列!");System.out.println("交换机:" + exchange);System.out.println("路由键:" + routingKey);System.out.println("错误代码:" + replyCode + ",错误信息:" + replyText);
});

(3)事务模式(不推荐)

作用: 生产者使用事务 channel.txCommit() 进行消息投递,失败时回滚 channel.txRollback()。但性能较低,不推荐使用。

 以上三种方法都能确认生产者的消息成功送达,但会带来系统额外的性能开销,因此都不推荐使用。非要使用,RabbitMQ 官方建议使用 ConfirmCallback。 

六、消费者确认机制

RabbitMQ 的消费者确认机制默认为none,这种模式下,RabbitMQ 不会要求消费者确认消息,消息会在消费者接收到时直接从队列中删除。 消费者确认机制(Consumer Acknowledgements)用于确保消息被正确消费,避免消息丢失或重复消费。主要有两种模式:

  1. 自动确认(autoAck = true)(不推荐,可能丢失消息)
  2. 手动确认(autoAck = false)(推荐,确保消息被正确处理)

1. 自动确认模式(Auto ACK)

默认情况下,RabbitMQ 采用自动确认,即消费者 收到消息后立即确认,无论业务逻辑是否执行成功,RabbitMQ 都会从队列中删除该消息。

代码示例:
@RabbitListener(queues = "testQueue")
public void receiveMessage(String message) {System.out.println("收到消息:" + message);int result = 1 / 0;  // 业务代码出错
}

消息 刚到达消费者 就被 RabbitMQ 删除,即使消费者 还没处理,也不会重新投递,导致消息丢失

2. 手动确认模式(Manual ACK)

手动确认允许消费者 成功处理消息后再手动确认,并可选择拒绝或重新入队,保证消息不会丢失。

代码示例:
@RabbitListener(queues = "testQueue", ackMode = "MANUAL")
public void receiveMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("收到消息:" + new String(message.getBody()));// 业务处理逻辑processMessage(new String(message.getBody()));// 处理成功,手动确认channel.basicAck(deliveryTag, false);} catch (Exception e) {System.out.println("消息处理失败:" + e.getMessage());// 处理失败,拒绝消息并重新入队channel.basicNack(deliveryTag, false, true);}
}

手动确认的三种方式如下:

方法作用说明
channel.basicAck(deliveryTag, false);确认消息处理成功后,通知 RabbitMQ 删除消息
channel.basicNack(deliveryTag, false, true);拒绝消息,并重新入队处理失败时,消息会重新回到队列
channel.basicReject(deliveryTag, false);拒绝消息,不重新入队直接丢弃消息

七、消息持久化

在 RabbitMQ 中,消息持久化是为了确保消息在 RabbitMQ 服务器崩溃重启 后,仍然能够被恢复。持久化可以保证即使 RabbitMQ 意外停止,消息不会丢失。

  • 临时消息(non-persistent): 默认情况下,消息在内存中存储,RabbitMQ 重启后会丢失。
  • 持久化消息(persistent): 消息存储在磁盘上,RabbitMQ 重启后可以恢复。

RabbitMQ 消息持久化涉及两个方面:

  1. 队列持久化
  2. 消息持久化
  3. 路由器持久化

1.设置队列和交换机持久化 

在声明队列和交换机时,需要设置队列的 durable 属性为 true,表示队列是持久化的。

@Beanpublic DirectExchange dlxExchange() {return new DirectExchange("dlx.exchange", true, false);}@Beanpublic Queue dlxQueue() {return new Queue("dlx.queue", true);}

2.设置消息持久化

为了使消息持久化,需要将消息的 deliveryMode 设置为 2(持久化)。如果是通过 Spring AMQP 发送消息,可以通过 MessagePostProcessor 来设置消息持久化。

通过 Spring AMQP 设置消息持久化:
@Autowired
private RabbitTemplate rabbitTemplate;public void sendMessage(String message) {MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 设置消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}};rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, messagePostProcessor);
}

八、延迟队列的实现

RabbitMQ 中实现 延迟队列(Delay Queue)通常有两种方式,分别是通过 插件 和 死信队列+TTL(Time-To-Live) 实现。延迟队列允许消息在特定的时间延迟后再被消费,通常用于实现任务调度、定时任务等功能。

(1)不用插件:

通过死信队列+TTL实现的。首先,我们将消息放到延迟队列中,将TTL设置为延迟时间,消息在TTL过期之后会被转发到死信交换机,再路由到死信队列,接着消费者到死信队列中消费消息,从而达到延迟消费的作用。

通过配置类创建死信交换机和死信队列,设置绑定和TTL 

@Configuration
public class DlxConfig {// 死信交换机@Beanpublic DirectExchange dlxExchange() {return new DirectExchange("dlx.exchange", true, false);}// 死信队列@Beanpublic Queue dlxQueue() {return new Queue("dlx.queue", true);}// 死信队列绑定@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx");}// 延迟交换机和队列@Beanpublic DirectExchange delayExchange() {return new DirectExchange("delay.exchange", true, false);}// 延迟队列@Beanpublic Queue delayQueue() {return QueueBuilder.durable("delay.queue").deadLetterExchange("dlx.exchange")  // 绑定死信交换机.deadLetterRoutingKey("dlx")         // 设置死信路由键.ttl(10 * 1000)                      // 设置消息TTL为10秒.build();}// 延迟队列和交换机绑定@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.routing.key");}
}

 测试类,发送消息到延迟队列:

@Test
public void test4() {LocalTime now = LocalTime.now(); // 获取当前时间DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化String msg = "hello, amqp";// 通过交换机发送消息rabbitTemplate.convertAndSend("delay.exchange", "delay.routing.key", msg);System.out.println("发送时间为:" + now.format(formatter));
}

 消费者,在死信队列中消费:

@RabbitListener(queues = "dlx.queue")
public void DelayedMessage(String msg) {LocalTime now = LocalTime.now(); // 获取当前时间DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化System.out.println("收到延迟消息时间为:" + now.format(formatter));
}

(2)使用插件:rabbitmq_delayed_message_exchange

消息发送到延迟交换机时,携带X-delay头(延迟时间),插件将消息暂存在内部数据库中,不会立即路由到队列,到期后将消息路由到目标队列; 

直接在消费者的监视器注解上配置信息:

// 使用插件的 延迟队列声明@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.exchange", delayed = "true"),key = "delay"))public void DelayMessage(String msg) {LocalTime now = LocalTime.now(); // 获取当前时间DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化System.out.println("收到延迟消息时间为:" + now.format(formatter));}

 测试类,设置TTL:

// 测试有延迟插件的 延迟队列@Testpublic void test5() {LocalTime now = LocalTime.now(); // 获取当前时间DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化String msg = "hello, amqp";rabbitTemplate.convertAndSend("delay.exchange", "delay", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10 * 1000);   // 设置过期时间return message;}});System.out.println("第一次发送时间为:" + now.format(formatter));}

(3)使用插件和不使用插件的区别: 

  1. 消息存储位置:没有插件的需要额外创建一个队列用来存储消息,而有插件的将消息暂存在Mnesia数据库中,占用资源较低;
  2. 性能和误差:没有插件的需要轮询检查消息TTL(默认是每秒检查一次),而有插件的使用 Erlang Timer 模块 或 时间轮算法管理延迟时间,性能比较好,延迟误差较小;
  3. 消息阻塞:没有插件的队列只会检查对头消息是否过期,会导致后续消息被阻塞;而有插件的对每个消息的延迟独立计算,到期后立即触发路由;

九、消息重复消费的解决

消息重复消费是一个常见的 RabbitMQ 消息队列问题,它可能会影响系统的准确性和性能。为了解决消息重复消费的问题,可以采取以下几种策略:

1)开启消费者确认机制(手动ack或自动ack确保每次消费者成功处理完消息后,发送一个信号给RabbitMQ, 如果消费者处理失败,重新排队未确认的消息;

@RabbitListener(queues = "order.payment.queue", ackMode = "MANUAL")
public void handlePaymentMessage(Message message, Channel channel) {String orderId = new String(message.getBody());try {// 处理订单支付orderService.processPayment(orderId);// 确认消息已成功消费channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,拒绝消息,并不重新入队channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}
}

(2)核心思想是保证幂等性:无论消息被消费多少次,结果与一次消费相同:

1.设置唯一标识(消息ID),将消息ID放到缓存中,每次消费前根据消息ID是否已存在来判定消息是否处理过;

@Autowired
private StringRedisTemplate redisTemplate;@RabbitListener(queues = "order.payment.queue")
public void handlePaymentMessage(String orderId) {String messageId = "payment:" + orderId;if (redisTemplate.opsForSet().isMember("processedMessages", messageId)) {return;  // 如果消息已经处理过,跳过}// 处理订单支付orderService.processPayment(orderId);// 将消息ID存入 Redis,防止重复消费redisTemplate.opsForSet().add("processedMessages", messageId);
}

2.利用数据库的唯一索引、主键的约束防止消息重复消费; 

    在消费者代码中,如果订单已存在(即订单ID已经在数据库中),则跳过该消息,从而避免重复消费:

    @RabbitListener(queues = "order.payment.queue")
    public void handlePaymentMessage(String orderId) {try {// 检查订单是否已支付(通过唯一索引判断是否重复消费)if (orderService.isOrderPaid(orderId)) {return;  // 如果订单已经支付,则跳过该消息}// 处理订单支付orderService.processPayment(orderId);} catch (Exception e) {// 处理异常e.printStackTrace();}
    }
    

    orderService.isOrderPaid(orderId) 方法中,我们可以查询数据库,检查订单是否已处理:

    public boolean isOrderPaid(String orderId) {// 通过唯一索引查询订单是否存在,如果存在则返回 true,表示已支付return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM order_payment WHERE order_id = ?", Integer.class, orderId) > 0;
    }
    

    3.使用乐观锁,通过版本号或条件判断是否重复消费;

    十、异常消息的处理

    异常消息的处理是消息队列中一个重要的问题,尤其是在 RabbitMQ 中。消息处理过程中,可能会遇到各种异常情况,例如消费者处理消息时发生错误、消息无法被消费等。为了提高系统的可靠性和容错能力,RabbitMQ 提供了多种机制来处理异常消息。

    配置.yml文件或者.properties文件:

    spring:rabbitmq:listener:simple:prefetch: 1  # 每个消费者最大未确认消息数(设为1实现能者多劳模式)acknowledge-mode: auto  # 自动ACK(高风险!建议改为 manual 手动确认,避免消息丢失)retry:enabled: true          # 是否开启发送失败重试(仅针对网络波动场景,不解决业务异常)multiplier: 1          # 重试间隔时间倍数(此处为固定间隔)max-attempts: 3        # 最大重试次数(含首次发送)initial-interval: 200ms # 首次重试间隔

    创建异常交换机和异常队列:

    /*** 定义异常交换机和异常队列* 将异常的消息重试多次失败后,统一放在这个错误队列中,人工处理*/@Configuration
    @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")   // 当这个属性存在且为true,配置类才生效
    public class ErrorConfig {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.exchange", true, false);}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true);}@Beanpublic Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}// 消息异常重试n此后,移到该异常交换机@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");}
    }

    总结

    RabbitMQ 作为轻量级的消息队列,适用于中小型业务场景,具备高可用性、可靠性和灵活的消息路由机制。然而,它的吞吐量不及 Kafka,且运维成本较高。对于需要事务保障、低延迟、灵活消息分发的业务,RabbitMQ 是一个不错的选择。

    版权声明:

    本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

    我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com