您的位置:首页 > 新闻 > 热点要闻 > kafka发送消息流程

kafka发送消息流程

2025/1/10 10:50:08 来源:https://blog.csdn.net/weixin_44794897/article/details/140346096  浏览:    关键词:kafka发送消息流程

在这里插入图片描述
配置props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);

public Map<String,Object> producerConfigs(){Map<String,Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);return props;
}public ProducerFactory producerFactory(){return new DefaultKafkaProducerFactory<>(producerConfigs());
}// 覆盖spring-kafka中的配置
@Bean
public KafkaTemplate<String,Object> kafkaTemplate(){return new KafkaTemplate<String,Object>(producerFactory());
}

自定义消息拦截器

public class CustomerProducerInterceptor implements ProducerInterceptor<String,Object> {// 发送消息时,对消息拦截。@Overridepublic ProducerRecord<String,Object> onSend(ProducerRecord producerRecord) {System.out.println("拦截消息" + producerRecord.toString());return null;}// 服务器是否收到了当前这条消息@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(recordMetadata != null){System.out.println("服务器收到消息" + recordMetadata.offset());}else{// 没有收到消息发送失败System.out.println("消息发送失败!!!");}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com