您的位置:首页 > 汽车 > 时评 > 南昌网站建设费用_中国纪检监察报刘一霖_seo关键词有哪些类型_seo分析案例

南昌网站建设费用_中国纪检监察报刘一霖_seo关键词有哪些类型_seo分析案例

2025/1/18 13:59:56 来源:https://blog.csdn.net/yqq962464/article/details/142363880  浏览:    关键词:南昌网站建设费用_中国纪检监察报刘一霖_seo关键词有哪些类型_seo分析案例
南昌网站建设费用_中国纪检监察报刘一霖_seo关键词有哪些类型_seo分析案例

引言

对业务系统来说,丢消息意味着数据丢失,这是无法接受的。

主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。

绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的。

虽然不同的消息队列提供的 API 不一样,相关的配置项也不同,但是在保证消息可靠传递这块儿,它们的实现原理是一样的。

检测消息丢失

消息队列的有序性

在这里插入图片描述
大概流程如下:

  1. 发送端在拦截器中,给每条消息附加一个连续递增的序号;
  2. 消费端在拦截器中检测消息的连续性,如果消息没有丢失,消息序号必然是库中保存的上一次消费到的消息序号+1;

对于 kafka 和 rocketmq,不保证在 Topic 上的严格顺序,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。

如果 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

除了在拦截器生成消息序号外,我们也可以利用消息位移来实现。

位移(Offset)是消息队列中用于标识消息位置的指标,它指向了消息在队列中的确切位置。在如Kafka这样的消息队列中,位移是消费者读取消息的起始点。消费者在读取消息后,会更新位移,表明已经读取了这些消息。如果位移连续,那么可以认为没有消息丢失。如果位移不连续,比如位移从100直接跳到了102,那么101的消息就可能丢失了。

例如:

 public void onMessage(List<ConsumerRecord<String, EventRecord>> data, Acknowledgment acknowledgment) {try {/*** * 业务处理* * */long initStartOffset = redisCluster.exists(String.format(Constants.INIT_OFFSET, bizName))? Long.valueOf(redisCluster.get(String.format(Constants.INIT_OFFSET, bizName))): 0L;if (initStartOffset == 0) {redisCluster.set(String.format(Constants.INIT_OFFSET, bizName), String.valueOf(data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).max().getAsLong()));acknowledgment.acknowledge();return;}long minOffsetRecord = data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).min().getAsLong();long maxOffsetRecord = data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).max().getAsLong();log.info("initStartOffset={}, minOffsetRecord={}, maxOffsetRecord={}", initStartOffset, minOffsetRecord, maxOffsetRecord);if (minOffsetRecord - initStartOffset > 1) {log.info(String.format(Constants.INIT_OFFSET, bizName) + "存在数据丢失。。。。。。。。。。。。。。。。。。minOffsetRecord = " + minOffsetRecord+ ",initStartOffset = " + initStartOffset);consumeUtils.sendDingWarn("同步消费异常", bizName + "数据丢失!!!!!!!");}redisCluster.set(String.format(Constants.INIT_OFFSET, bizName), String.valueOf(maxOffsetRecord));acknowledgment.acknowledge();} catch (Exception e) {throw new RuntimeException("kafka message process error!", e);}}

在检测到消息丢失时,我们可以钉钉报警,也可以直接抛出异常,停止消费。

确保消息可靠传递

在这里插入图片描述

  • 生产阶段: 消息在 Producer 创建出来,经过网络传输发送到 Broker 端;
  • 存储阶段: 消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上;
  • 消费阶段: Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

在上面三个阶段中,哪些会发生消息丢失呢?

生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。

有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。

以 Kafka 为例,我们看一下如何可靠地发送消息:同步发送时,只要注意捕获异常即可。

try {RecordMetadata metadata = producer.send(record).get();System.out.println("消息发送成功。");
} catch (Throwable e) {System.out.println("消息发送失败!");System.out.println(e);
}

异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。

producer.send(record, (metadata, exception) -> {if (metadata != null) {System.out.println("消息发送成功。");} else {System.out.println("消息发送失败!");System.out.println(exception);}
});

存储阶段

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

比如,上面的那段代码,只有业务逻辑处理完成后,才会发送消费确认。

acknowledgment.acknowledge();

参考资料
《消息队列高手课》

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com