您的位置:首页 > 房产 > 建筑 > MQ技术方案

MQ技术方案

2025/1/25 9:20:09 来源:https://blog.csdn.net/qq_46637011/article/details/142170167  浏览:    关键词:MQ技术方案

1. 保证MQ消息的可靠性

保证MQ消息的可靠性分两个方面:保证生产消息的可靠性、保证消费消息的可靠性。

1.1 保证生产消息的可靠性

1.1.1 重试机制

首先发送消息的方法如果执行失败会进行重试,这里我们在发送消息的工具类中使用spring提供的@Retryable注解,实现发送失败重试机制,通过注解的backoff属性指定重试等待策略,通过Recover注解指定失败回调方法,失败重试后仍然失败的会走失败回调方法,在回调方法中将失败消息写入一个失效消息表由定时任务进行补偿(重新发送),如果系统无法补偿成功则由人工进行处理,单独开发人工处理失败消息的功能模块。

通过@Retryable注解可以实现消息发送失败时的重试机制,避免因为网络抖动或临时故障导致的发送失败:

  • backoff属性用于定义重试间隔。
  • maxAttempts属性用于定义最大重试次数。
  • Recover注解用于指定重试失败后的回调逻辑,将未成功发送的消息记录到失败表中。
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000))
public void sendMessage(String message) {// 发送消息逻辑
}@Recover
public void recover(Exception e, String message) {// 发送失败的回调逻辑,如记录失败消息
}

1.1.2 生产者确认机制(Producer Confirm)

RabbitMQ提供了生产者确认机制,通过Publisher Confirm来确保消息从生产者成功发送到Broker,并最终到达队列。这一机制通过ACK(Acknowledgment)和NACK(Negative Acknowledgment)来确认消息的处理结果。

  • ACK: 表示消息已成功到达Broker。
  • NACK: 表示消息没有成功到达,可以根据NACK结果进行处理。

具体的做法:

  1. 消息唯一标识:每个消息发送时指定一个唯一ID,用于标识该消息的状态。
  2. 回调机制:通过设置回调方法(ConfirmCallback),可以监控ACK/NACK的回执状态。
    • 如果返回ACK,则表示消息已成功投递。
    • 如果返回NACK,生产者可根据NACK结果进行重发或记录到失败消息表,交给定时任务处理。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {// 消息成功投递System.out.println("消息成功发送到Broker,ID:" + correlationData.getId());} else {// 消息投递失败System.out.println("消息发送失败,原因:" + cause);// 记录到失败表}
});

1.1.3 ReturnCallback回调机制

在某些情况下,消息可能会成功到达Broker,但未能路由到队列。这时会触发ReturnCallback回调方法,通过它可以接收到未成功投递的消息,并进行相应的处理或补偿。

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("消息无法路由,交换机:" + exchange + ",路由键:" + routingKey);// 补偿逻辑,如记录消息
});

1.1.4 定时任务

上述三种方法,将失败的消息写入到一个失败消息记录表,然后由定时任务进行补偿(重新发送),如果系统无法补偿成功则由人工进行处理,单独开发人工处理失败消息的功能模块。

1.2 保证消费消息可靠性

为了防止消息在Broker中丢失,可以将消息设置为持久化,具体需要设置交换机和队列支持持久化,发送消息设置deliveryMode=2。这样即使RabbitMQ重启,也能从磁盘中恢复未处理的消息。

1.2.1 重试机制

消费者在处理消息时,如果出现异常,可以使用重试机制来进行故障恢复。重试机制通常会和死信队列结合使用:

  • 当重试次数超过一定阈值,消息将被投递到失败消息队列,由定时任务或者人工处理。
  • 通过设置队列的x-dead-letter-exchangex-dead-letter-routing-key,可以指定消息在重试失败后进入失败消息队列。

1.2.2 消费确认机制(Consumer Acknowledgment)

RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理完成消息,RabbitMQ收到ACK后删除消息。

RabbitMQ提供了三种消费确认模式:

  1. 自动ACK(默认): RabbitMQ自动确认消息,不管消费者是否成功处理了消息,这种模式下容易丢失消息,不建议使用。
  2. 手动ACK: 需要消费者手动确认消息是否被处理成功,如果成功处理则发送ACK回执,RabbitMQ会删除该消息;如果处理失败,可以进行重试或将消息放入死信队列。
  3. 关闭ACK: 消费者不发送任何ACK,不推荐使用。

一般,我们都是使用默认的auto即可。

spring:rabbitmq:....listener:simple:acknowledge-mode: auto #,出现异常时返回nack,消息回滚到mq;没有异常,返回ackretry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 10 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

1.2.3 定时任务

重试机制达到上限后,将消息投递到失败消息队列,失败消息队列由定时任务程序定时处理,如果系统无法处理成功则由人工进行处理。

1.3 无法百分百保证MQ的消息可靠性

尽管通过生产者确认机制、消费者ACK机制、持久化、消息重试和补偿等措施可以极大提高MQ消息的可靠性,但由于不可控因素(如网络断开、硬件故障、进程异常终止等),无法百分之百保证消息的可靠性。因此,企业在设计使用MQ时,需要充分考虑异常情况的存在,并通过补偿机制、消息幂等性设计、消息重试等手段进一步降低消息丢失的风险。

2. 保证消息幂等性

2.1 幂等性是什么

幂等性(Idempotency)是指一个操作可以重复执行多次,但无论执行多少次,结果都是相同的,不会有副作用。对于分布式系统中使用消息队列的场景,幂等性通常用于保证消息不会因重复消费而导致数据错误。

举个例子:

假设你在电商系统中处理“用户下单”的消息,如果这个操作不是幂等的,用户下单的消息被重复消费时,可能会导致系统生成多笔订单、重复扣款。而如果“用户下单”这个操作是幂等的,不管消息被消费几次,最终都只会有一笔订单生成,避免重复处理。

2.2 为什么会有重复消费

MQ(消息队列)中,可能会发生消费者重复消费的情况。导致重复消费的原因有很多:

  1. 消费者处理失败:消费者处理消息时出现异常或超时,导致MQ认为消息未被成功处理,于是重新投递。
  2. 网络抖动:消费者收到消息后未能及时发送ACK确认,导致消息被再次投递。
  3. 业务逻辑错误:系统没有考虑幂等性设计,在同一条消息重复消费时导致数据出错。

2.3 如何保证MQ消息的幂等性

2.3.1 使用数据库的唯一约束控制幂等性

数据库的唯一约束可以帮助确保一条消息只被处理一次。例如,在插入一条订单数据时,可以为订单ID设置唯一约束。这样即使同一条消息被重复消费,数据库的唯一约束也会保证只有第一次插入成功,后续的重复插入会失败。

  • 适用场景: 比如订单系统,确保同一个订单号不会被重复处理。
CREATE TABLE orders (order_id VARCHAR(255) PRIMARY KEY, -- 唯一约束,订单IDuser_id VARCHAR(255),product_id VARCHAR(255),quantity INT,status VARCHAR(50)
);
try {// 尝试插入订单insertOrder(order);
} catch (DuplicateKeyException e) {// 捕获唯一键冲突异常,表明该订单已经存在,无需重复处理System.out.println("重复订单,不再处理");
}

2.3.2 使用Token机制

Token机制是另一种确保幂等性的常见手段。可以在发送消息时为每条消息指定一个唯一的标识符(Token或消息ID),然后在消费时通过Redis等缓存系统去判断该消息是否已经被消费过。

  1. 消息发送时生成唯一Token: 每条消息在生产时会生成一个唯一ID,比如UUID。

  2. 消息ID记录到Redis: 生产者在发送消息时,将该消息的ID存储到Redis,作为消息已经处理的标记。

  3. 消费者消费前先查询Redis: 当消费者接收到消息时,首先在Redis中查找该消息ID,如果发现ID已经存在,说明该消息已经被处理过,则跳过该次处理。

  4. 消费完成后,记录消息ID到Redis: 消费者成功处理完消息后,将该消息ID记录到Redis,表示该消息已成功消费。

String messageId = message.getId(); // 消息唯一ID// 检查消息是否已处理
if (redisTemplate.hasKey(messageId)) {// 如果存在,表明消息已处理过,跳过return;
}// 处理消息
processMessage(message);// 消费完成后,将消息ID存储到Redis,设置过期时间
redisTemplate.opsForValue().set(messageId, "consumed", 10, TimeUnit.MINUTES);

优点: 这种方式适合需要临时性幂等检查的场景,过期时间可以根据业务场景进行设置。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com