您的位置:首页 > 新闻 > 热点要闻 > 百度云链接_免费网站建设建议_免费开源网站_html简单网页代码

百度云链接_免费网站建设建议_免费开源网站_html简单网页代码

2024/10/11 12:55:51 来源:https://blog.csdn.net/happycao123/article/details/142217992  浏览:    关键词:百度云链接_免费网站建设建议_免费开源网站_html简单网页代码
百度云链接_免费网站建设建议_免费开源网站_html简单网页代码

序列化

图片

  • kafka 需要将消息内容序列化(Serializer)成字节数组才能发送到 Broken节点

  • 消费者需要将字节数组反序列化(Deserializer)为消息内容,然后消费消息。接口定义如下

public interface Serializer<T> extends Closeable {default void configure(Map<String, ?> configs, boolean isKey) {}byte[] serialize(String var1, T var2);default byte[] serialize(String topic, Headers headers, T data) {return this.serialize(topic, data);}default void close() {}
}
public interface Deserializer<T> extends Closeable {default void configure(Map<String, ?> configs, boolean isKey) {}T deserialize(String var1, byte[] var2);default T deserialize(String topic, Headers headers, byte[] data) {return this.deserialize(topic, data);}default void close() {}
}

常用序列化器

kafka 内置了许多实现如 StringSerializer、IntegerSerializer、DoubleSerializer 等。

生产者与消费者 选择的序列化与反序列化要匹配才能正常解析

自定义序列化器

自定义序列化器 只需要实现对应的 Serializer,UserDeserializer

比如有一个 User 类

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {private Long id;private String userName;
}

UserSerializer

public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic byte[] serialize(String s, User user) {return JSON.toJSONBytes(user);}@Overridepublic void close() {}
}
public class UserDeserializer implements Deserializer<User> {@Overridepublic User deserialize(String s, byte[] bytes) {return JSON.parseObject(bytes,User.class);}
}

生产者与消费者配置

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);

上面我们配置了 User 相关的序列化,但是实际上项目发送的消息内容肯定不止 User 这一种,我们需要针对不同类型指定不同的序列化方式

DelegatingByTypeSerializer

从 2.8 开始新增了DelegatingByTypeSerializer完美解决了上面问题用法如下,

    @Beanpublic ProducerFactory producerFactory() {Map<Class<?>, Serializer> delegates = new HashMap<>();delegates.put(byte[].class, new ByteArraySerializer());delegates.put(Bytes.class, new BytesSerializer());delegates.put(String.class, new StringSerializer());delegates.put(User.class, new UserSerializer());return new DefaultKafkaProducerFactory<>(produceConfigs(),new StringSerializer(), new DelegatingByTypeSerializer(delegates));}

消费者序列化可以在   @KafkaListener 指定

完整配置

确保kafka-clients版本在2.8以上

kafka 配置类

@Configuration
@EnableKafka
public class KafkaConfig {private Map<String, Object> produceConfigs() {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);return configMap;}@Beanpublic ProducerFactory producerFactory() {Map<Class<?>, Serializer> delegates = new HashMap<>();delegates.put(byte[].class, new ByteArraySerializer());delegates.put(Bytes.class, new BytesSerializer());delegates.put(String.class, new StringSerializer());delegates.put(User.class, new UserSerializer());return new DefaultKafkaProducerFactory<>(produceConfigs(),new StringSerializer(), new DelegatingByTypeSerializer(delegates));}@Beanpublic KafkaTemplate kafkaTemplate() {return new KafkaTemplate(producerFactory());}private Map<String, Object> consumerConfigs() {Map<String, Object> configMap = new HashMap<>();configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:9092");configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,IntegerDeserializer.class);configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "groupTest");return configMap;}}

消费者  @KafkaListener 注解可以指定序列化器

 @KafkaListener(topics = "test")public void processMessage(final ConsumerRecord<String,User> record) {System.out.println("processMessage:" + JSON.toJSONString(record.value()));}@KafkaListener(topics = "topic2",properties = "value-deserializer:org.apache.kafka.common.serialization.StringDeserializer")public void processMessage(String content) {System.out.println("processMessage:" + content);}

发送消息代码

for (int i = 0; i < 100; i++) {messageProduce.sendMessage(new User(Long.valueOf(i), "zhangsan" + i));messageProduce.sendMessage2("hello world" + i);
}

结果验证

图片

消息被正常消费

总结

        本文主要介绍了自定义序列化方式,以及为不同类型指定不同的序列化实现方式。

        由于篇幅有限,文中只包含核心配置和重要代码,部分代码未贴出,可留言交流学习。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com