您的位置:首页 > 房产 > 建筑 > 四川seo哪家好_网络游戏吧_深圳做seo有哪些公司_杭州seo公司服务

四川seo哪家好_网络游戏吧_深圳做seo有哪些公司_杭州seo公司服务

2024/12/26 23:53:07 来源:https://blog.csdn.net/weixin_42924400/article/details/142692451  浏览:    关键词:四川seo哪家好_网络游戏吧_深圳做seo有哪些公司_杭州seo公司服务
四川seo哪家好_网络游戏吧_深圳做seo有哪些公司_杭州seo公司服务

kafka基本概念以及用法目录


文章目录

  • kafka基本概念以及用法目录
  • 一、什么是kafka?
  • 二、为什么要使用kafka?
    • 三、kafka的基本概念
    • 四、安装kafka(windows版本)
    • 五、命令行控制kafka生产消费数据,创建 删除topic
    • 六、java操作kafka消费生产


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是kafka?

kafka是一个分布式流处理工具,被全球大量公司广泛应用在大数据处理领域

二、为什么要使用kafka?

众所周知,在微服务领域或者大数据领域,需要服务和服务之间进行数据交换,数据通信,大数据领域系统和系统之间可能还有海量的数据交换压力。
1.传统的线程和线程之间的数据交换方式
在这里插入图片描述
jvm中会有一块公共的区域叫做堆内存,线程和线程之间会推送数据到堆内存中,其他线程需要获取数据就去堆内存获取
2.传统的进程和进程之间的数据交换方式
在这里插入图片描述

进程和进程之间是通过网络传输数据(Http,或者socket等常见网络传输协议)
但是不管是进程还是线程,传统这种数据传输交换方式,如果在海量数据高并发的场景下,如果接受数据方的内存跟不上推送的速度,就会引起内存溢出,堆内存溢出等生产问题。而kafka就是为了解决这个问题,孕育而生的。他充当了交换数据中间的一个中间件,类似一个消息队列的缓冲区

三、kafka的基本概念

一般市面上面的消息队列都遵循了JMS(Java Message Service)的传输规范
1.P2P(point to point)
在这里插入图片描述
2.PS(publish and subscribe) 发布订阅模式
在这里插入图片描述

四、安装kafka(windows版本)

1.下载kafka
https://kafka.apache.org/downloads
在这里插入图片描述
解压目录:
在这里插入图片描述
启动kafka需要依赖zookeeper,我们可以使用kafka自带的zookeeper
在这里插入图片描述
在log文件夹下面新建zk文件夹区分日志文件
在这里插入图片描述
修改配置文件中zookeeper文件日志文件位置
在这里插入图片描述
修改kafka运行日志保存位置
在这里插入图片描述
启动zookeeper和kafka
cmd到bin目录下面windows执行下面的bat脚本
在这里插入图片描述后面跟上刚才修改的配置文件
在这里插入图片描述
启动zookeeper成功 。
开始启动kafka
启动bat脚本 后面跟上刚才带上的配置文件
在这里插入图片描述
在这里插入图片描述
启动成功
tips:后续可能会出现kafka出现进程挂掉的报错
在这里插入图片描述
可以删除配置的两个文件夹下面的文件,重新启动zookeeper和kafka
在这里插入图片描述

五、命令行控制kafka生产消费数据,创建 删除topic

1.创建一个名为test的topic
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create
2.查看所有topic
kafka-topics.bat --bootstrap-server localhost:9092 --list
3.删除一个名为test的topic
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete
4.生产数据
kafka-console-producer.bat --broker-list localhost:9092 --topic test
5.消费数据
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
在这里插入图片描述
在这里插入图片描述

tips:可以加–help查看每个脚本后面的参数的具体用法
在这里插入图片描述

六、java操作kafka消费生产

先引入maven依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version>
</dependency>

version根据自己的kafka版本做调整

生产者代码:

   public static void main(String[] args) {// 配置Kafka生产者Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka集群地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka生产者Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息try {producer.send(new ProducerRecord<>("test", "key", "Message to send"));System.out.println("Message sent");} catch (Exception e) {e.printStackTrace();} finally {// 关闭生产者producer.close();}}

命令行接受到了生产数据
在这里插入图片描述
消费者代码:

 public static void main(String[] args) {// 配置Kafka消费者Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test-group");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建Kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(Arrays.asList("test"));// 轮询消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("Offset: %d, Key: %s, Value: %s\n", record.offset(), record.key(), record.value());}}}

再运行生产者代码 控制台就会打印出来消费的数据
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com