您的位置:首页 > 游戏 > 手游 > rabbitmq消息投递失败

rabbitmq消息投递失败

2024/12/23 4:42:26 来源:https://blog.csdn.net/Casual_Lei/article/details/140429639  浏览:    关键词:rabbitmq消息投递失败

在 RabbitMQ 中,消息投递失败可能会发生在多个阶段,比如从生产者到交换机、从交换机到队列、从队列到消费者等。处理消息投递失败需要采取适当的措施来确保消息的可靠性和系统的健壮性。以下是处理不同阶段消息投递失败的方法:

1. 从生产者到交换机的投递失败

当生产者发送消息到交换机时,如果交换机不存在或者消息被交换机拒绝(比如 mandatory 参数设置为 true 而没有合适的队列),可以通过以下方式处理:

使用 Confirm 模式

生产者可以使用 RabbitMQ 的 Confirm 模式来确保消息成功发送到交换机。

import com.rabbitmq.client.*;public class Producer {private final static String EXCHANGE_NAME = "example_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.confirmSelect(); // Enable publisher confirmationsString message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "routing_key", null, message.getBytes("UTF-8"));if (channel.waitForConfirms()) {System.out.println(" [x] Message sent successfully");} else {System.out.println(" [x] Message delivery failed");}}}
}

2. 从交换机到队列的投递失败

如果消息发送到交换机后没有合适的队列绑定,可以使用 mandatory 参数和 ReturnListener 来处理未路由的消息。

import com.rabbitmq.client.*;public class Producer {private final static String EXCHANGE_NAME = "example_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {String message = new String(body, "UTF-8");System.out.println(" [x] Message returned: " + message);// 处理未路由的消息});String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "invalid_routing_key", true, null, message.getBytes("UTF-8"));}}
}

3. 从队列到消费者的投递失败

在消息从队列投递到消费者时,如果消费者无法处理消息,消费者可以使用 NackReject 来处理失败的消息。

使用手动消息确认

消费者可以手动确认消息,如果处理失败,可以选择 Nack 消息并重新入队,或者 Reject 消息并将其投递到死信队列(Dead Letter Queue)。

import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1); // Fair dispatchDeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {// 处理消息processMessage(message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,重新入队或投递到死信队列channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}};boolean autoAck = false; // Explicitly acknowledge messagechannel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });}}private static void processMessage(String message) throws Exception {// 消息处理逻辑if (message.contains("error")) {throw new Exception("Processing error");}System.out.println(" [x] Processed '" + message + "'");}
}

4. 使用死信队列(DLQ)

配置死信队列,当消息在队列中被拒绝、过期或者达到最大重试次数时,将其投递到死信队列以便后续处理。

配置示例
import com.rabbitmq.client.*;import java.util.HashMap;
import java.util.Map;public class DLQExample {private static final String MAIN_QUEUE = "main_queue";private static final String DLQ_QUEUE = "dlq_queue";private static final String EXCHANGE_NAME = "exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// Declare DLQchannel.queueDeclare(DLQ_QUEUE, true, false, false, null);// Declare main queue with DLQ settingsMap<String, Object> argsMap = new HashMap<>();argsMap.put("x-dead-letter-exchange", "");argsMap.put("x-dead-letter-routing-key", DLQ_QUEUE);channel.queueDeclare(MAIN_QUEUE, true, false, false, argsMap);// Declare exchange and bind main queuechannel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueBind(MAIN_QUEUE, EXCHANGE_NAME, "routing_key");String message = "Test Message";channel.basicPublish(EXCHANGE_NAME, "routing_key", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

总结

为了处理 RabbitMQ 中的消息投递失败,应该综合使用以下策略:

  1. 使用 Confirm 模式和 ReturnListener 确保消息从生产者正确发送到交换机和队列。
  2. 使用手动消息确认机制处理消费者无法处理的消息。
  3. 配置死信队列处理无法处理或过期的消息。
  4. 确保消息处理逻辑具有幂等性,以防止重复处理导致的数据不一致。

通过这些方法,可以提高消息系统的可靠性和健壮性,确保消息不会丢失或重复处理。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com