您的位置:首页 > 财经 > 金融 > 淘宝店铺装修免费全套模板_质量可靠的网站网页归档_私人网站管理软件_百度搜索引擎的特点

淘宝店铺装修免费全套模板_质量可靠的网站网页归档_私人网站管理软件_百度搜索引擎的特点

2024/12/23 21:47:55 来源:https://blog.csdn.net/cwzb/article/details/144287305  浏览:    关键词:淘宝店铺装修免费全套模板_质量可靠的网站网页归档_私人网站管理软件_百度搜索引擎的特点
淘宝店铺装修免费全套模板_质量可靠的网站网页归档_私人网站管理软件_百度搜索引擎的特点

使用 kafka-clients 原本是比较简单的事情。但有些同学习惯了 spring-kafka 后,对原始 java 接口会陌生些。会希望有个集成的示例。

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version>
</dependency>

现在我们使用原始 sdk 的依赖包,做一个 solon 项目的集成分享(其它的框架,也可以参考此例)。

1、添加集成配置

使用 Solon 初始器 生成一个 Solon Web 模板项目,然后添加上面的 kafka-clients 依赖。之后:

  • 添加 yml 配置(具体的配置属性,参考:ProducerConfig,ConsumerConfig)
solon.app:name: "demo-app"group: "demo"solon.logging:logger:root:level: INFO# 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考)
solon.kafka:properties:  #公共配置(配置项,参考:ProducerConfig,ConsumerConfig 的公用部分)bootstrap:servers: "127.0.0.1:9092"key:serializer: "org.apache.kafka.common.serialization.StringSerializer"deserializer: "org.apache.kafka.common.serialization.StringDeserializer"value:serializer: "org.apache.kafka.common.serialization.StringSerializer"deserializer: "org.apache.kafka.common.serialization.StringDeserializer"producer: #生产者专属配置(配置项,参考:ProducerConfig)acks: "all"consumer: #消费者专属配置(配置项,参考:ConsumerConfig)enable:auto:commit: "false"isolation:level: "read_committed"group:id: "${solon.app.group}:${solon.app.name}"
  • 添加 java 配置器
@Configuration
public class KafkaConfig {@Beanpublic KafkaProducer<String, String> producer(@Inject("${solon.kafka.properties}") Properties common,@Inject("${solon.kafka.producer}") Properties producer) {Properties props = new Properties();props.putAll(common);props.putAll(producer);return new KafkaProducer<>(props);}@Beanpublic KafkaConsumer<String, String> consumer(@Inject("${solon.kafka.properties}") Properties common,@Inject("${solon.kafka.consumer}") Properties consumer) {Properties props = new Properties();props.putAll(common);props.putAll(consumer);return new KafkaConsumer<>(props);}
}

完成上面两步,就算是集成了(后面,是应用的事儿)。配置也可以没有,直接写代码设定属性。

2、应用

  • 发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controller
public class DemoController {@Injectprivate KafkaProducer<String, String> producer;@Mapping("/send")public void send(String msg) {//发送producer.send(new ProducerRecord<>("topic.test", msg));}
}
  • 拉取(或消费),这里采用定时拦取方式:(仅供参考)

这里需要引入一个 solon 的简单调度插件(或,别的调度插件),用于定时拉取消息:

<dependency><groupId>org.noear</groupId><artifactId>solon-scheduling-simple</artifactId>
</dependency>

编写定时拉取任务:

@Component
public class DemoJob {@Injectprivate KafkaConsumer<String, String> consumer;@Initpublic void init() {//订阅consumer.subscribe(Arrays.asList("topic.test"));}@Scheduled(fixedDelay = 10_000L, initialDelay = 10_000L)public void job() throws Exception {//拉取ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());//确认consumer.commitSync();}}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com