您的位置:首页 > 汽车 > 时评 > Kafka基础入门-代码实操

Kafka基础入门-代码实操

2025/1/7 12:01:23 来源:https://blog.csdn.net/f_yuqing/article/details/140422687  浏览:    关键词:Kafka基础入门-代码实操

   Kafka是基于发布/订阅模式的消息队列,消息的生产和消费都需要指定主题,因此,我们想要实现消息的传递,第一步必选是创建一个主题(Topic)。下面我们看下在命令行和代码中都是如何创建主题和实现消息的传递的。

使用命令行操作Kafka

使用命令行操作主题

  • 使用kafka-topics.sh脚本来实现对Topic的操作
sh kafka-topics.sh

   执行命令之后,我们可以找到到下面这行提示,REQUIRED代表必须的,就是说我们想要实现对Kafak的操作必须要带有这个参数,表示我们要连接的Kafka具体服务。

--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  connect to>                              to.   

   接下来,就让我们创建一个主题吧。

# --bootstrap-server  用于指定我们连接的Kafka服务地址,9092是默认端口号  
# --topic  指定要操作的Topic名称  
# --create 表示本次是要创建一个主题  
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test  --create
# 执行结果
Created topic test.

   查看下我们的主题是否创建成功

sh kafka-topics.sh --bootstrap-server localhost:9092 --list    
# 执行结果
test

   查看某一个主题的详细信息

sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test  --describe  
# 执行结果
Topic: test	TopicId: ehyjS3R3Saq8Cx2V1x0p7g	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824Topic: test	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

使用命令行消费数据

  • 我们通过kafka-console-consumer.sh来生产消息。
 sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test#输出
hello kafka

使用命令行生产数据

  • 我们通过kafka-console-consumer.sh来生产消息。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test  
# 输入
>hello kafka

   想了解如何启动Kafka,可以看这篇文章《Kafka基础入门》。

使用代码操作Kafka

   添加依赖包

 <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.7.1</version></dependency></dependencies>

生产者代码

        // 创建配置对象Map<String,Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"lcoalhost:9092");// 对生产的数据的K,V 进行序列化的操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSelection.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSelection.class.getName());// 创建生产者对象//      生产者需要设定泛型:数据类型的约束KafkaProducer<String,String> producer = new KafkaProducer<String,String>(configMap);// 创建数据//      构建数据时,需要传递三个参数//          第一个参数表示主题名称,主题不存在时会自动创建//          第二个参数表示数据的Key//          第二个参数表示数据的ValueProducerRecord<String,String> record = new ProducerRecord<String,String>("test","key","value");// 通过生产者对象,将数据发送到Kafkaproducer.send(record);//关闭生产者对象producer.close();

消费者代码

        // 创建配置对象Map<String,Object> configMap = new HashMap<>();configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"lcoalhost:9092");// 对生产的数据的K,V 进行反序列化的操作configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());configMap.put(ConsumerConfig.GROUP_ID_CONFIG,"com.kafka.test");// 创建消费者对象KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String,String>(configMap);//订阅主题kafkaConsumer.subscribe(Collections.singleton("test"));// 从Kafka中获取数据//      消费者从Kafka中拉取数据ConsumerRecords<String, String> datas = kafkaConsumer.poll(1000);datas.forEach(data ->{System.out.println(data);});// 关闭消费者对象kafkaConsumer.close();

在这里插入图片描述
点击下方名片,关注『编程青衫客』
随时随地获取最新好文章!

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com