您的位置:首页 > 游戏 > 游戏 > 大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用

大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用

2024/10/7 2:32:20 来源:https://blog.csdn.net/w776341482/article/details/140788537  浏览:    关键词:大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka (正在更新…)

章节内容

上节我们完成了:

  • topics.sh、producer.sh、consumer.sh 脚本的基本使用
  • pom.xml 配置
  • JavaAPI的使用:producer 和 consumer

在这里插入图片描述

架构图

上节已经出现过了,这里再放一次
在这里插入图片描述

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>springboot-kafka</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

配置文件

我们常见的配置文件如下图:

spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializertemplate:default-topic: my-topic

Producer

编写代码

编写了一个KafkaProducerController
里边写了两个方法,都是使用了 KafkaTemplate 的工具。

@RestController
public class KafkaProducerController {@Resourceprivate KafkaTemplate<Integer, String> kafkaTemplate;@RequestMapping("/sendSync/{message}")public String sendSync(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 1, message);ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);try {SendResult<Integer, String> result = future.get();System.out.println(result.getProducerRecord().key() + "->" +result.getProducerRecord().partition() + "->" +result.getProducerRecord().timestamp());} catch (Exception e) {e.printStackTrace();}return "Success";}@RequestMapping("/sendAsync/{message}")public String sendAsync(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 2, message);ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败!");ex.printStackTrace();}@Overridepublic void onSuccess(SendResult<Integer, String> result) {System.out.println("发送成功");System.out.println(result.getProducerRecord().key() + "->" +result.getProducerRecord().partition() + "->" +result.getProducerRecord().timestamp());}});return "Success";}}

测试结果

http://localhost:8085/sendSync/wzktest1
http://localhost:8085/sendAsync/wzktest2
http://localhost:8085/sendAsync/wzktest222222

我们观察控制台的效果如下:
在这里插入图片描述

Consumer

编写代码

编一个类来实现Consumer:

@Configuration
public class KafkaConsumer {@KafkaListener(topics = {"wzk_topic_test"})public void consume(ConsumerRecord<Integer, String> consumerRecord) {System.out.println(consumerRecord.topic() + "\t"+ consumerRecord.partition() + "\t"+ consumerRecord.offset() + "\t"+ consumerRecord.key() + "\t"+ consumerRecord.value());}}

测试运行

2024-07-12 13:48:46.831  INFO 15352 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=13, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=h121.wzk.icu:9092 (id: 0 rack: null), epoch=0}}
2024-07-12 13:48:46.926  INFO 15352 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : wzk-test: partitions assigned: [wzk_topic_test-0]
wzk_topic_test	0	13	1	wzktest
wzk_topic_test	0	14	2	wzktest222
wzk_topic_test	0	15	2	wzktest222222

控制台的截图如下:
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com