您的位置:首页 > 房产 > 家装 > 邯郸网络作家村_新手做网站_软文推广新闻发布_武汉seo系统

邯郸网络作家村_新手做网站_软文推广新闻发布_武汉seo系统

2025/3/6 21:06:31 来源:https://blog.csdn.net/remsqks/article/details/145965557  浏览:    关键词:邯郸网络作家村_新手做网站_软文推广新闻发布_武汉seo系统
邯郸网络作家村_新手做网站_软文推广新闻发布_武汉seo系统

官网地址:https://kafka.apache.org/

1. 概念

Kafka是一个由Scala和Java语言开发的,经典高吞吐量的分布式消息发布和订阅系统,也是大数据技术领域中用作数据交换的核心组件之一。以高吞吐,低延迟,高伸缩,高可靠性,高并发,且社区活跃度高等特性,从而备受广大技术组织的喜爱。

1.1 消息队列

Kafka软件最初的设计就是专门用于数据传输的消息系统,类似功能的软件有RabbitMQ、ActiveMQ、RocketMQ等。这些软件名称中的MQ是英文单词Message Queue的简称,也就是所谓的消息队列的意思。这些软件的核心功能是传输数据,而Java中如果想要实现数据传输功能,那么这个软件一般需要遵循Java消息服务技术规范JMS(Java Message Service)。

Kafka拥有作为一个消息系统应该具备的功能,但是却有着独特的设计。可以这样说,Kafka借鉴了JMS规范的思想,但是却并没有完全遵循JMS规范。这也恰恰是软件名称为Kafka,而不是KafkaMQ的原因。

1.2 JMS

JMS是Java平台的消息中间件通用规范,定义了主要用于消息中间件的标准接口。如果不是很理解这个概念,可以简单地将JMS类比为Java和数据库之间的JDBC规范。Java应用程序根据JDBC规范种的接口访问关系型数据库,而每个关系型数据库厂商可以根据JDBC接口来实现具体的访问规则。JMS定义的就是系统和系统之间传输消息的接口。

JMS支持两种消息发送和接收模型:一种是P2P(Peer-to-Peer)点对点模型,另外一种是发布/订阅(Publish/Subscribe)模型。

  • P2P模型: P2P模型是基于队列的,消息生产者将数据发送到消息队列中,消息消费者从消息队列中接收消息。因为队列的存在,消息的异步传输成为可能。P2P模型的规定就是每一个消息数据,只有一个消费者,当发送者发送消息以后,不管接收者有没有运行都不影响消息发布到队列中。接收者在成功接收消息后会向发送者发送接收成功的消息。
  • 发布/订阅模型: 所谓得发布订阅模型就是事先将传输的数据进行分类,我们管这个数据的分类称之为主题(Topic)。也就是说,生产者发送消息时,会根据主题进行发送。比如咱们的消息中有一个分类是NBA,那么生产者在生产消息时,就可以将NBA篮球消息数据发送到NBA主题中,这样,对NBA消息主题感兴趣的消费者就可以申请订阅NBA主题,然后从该主题中获取消息。这样,也就是说一个消息,是允许被多个消费者同时消费的。这里生产者发送消息,我们称之为发布消息,而消费者从主题中获取消息,我们就称之为订阅消息。Kafka采用就是这种模型。

1.3 生产者-消费者模式

生产者-消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不着生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个消息缓冲区,平衡了生产者和消费者的处理能力。在数据传输过程中,起到了一个削弱峰值的作用,也就是我们经常说到的削峰。
在这里插入图片描述

图形中的缓冲区就是用来给生产者和消费者解耦的。在单点环境中,我们一般会采用阻塞式队列实现这个缓冲区。而在分布式环境中,一般会采用第三方软件实现缓冲区,这个第三方软件我们一般称之为中间件。纵观大多数应用场景,解耦合最常用的方式就是增加中间件。

1.4 消息中间件对比

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级,比RocketMQ,Kafka低一个数量级万级,比RocketMQ,Kafka低一个数量级10万级,支持高吞吐10万级,支持高吞吐
Topic数量对吞吐量的影响Topic可以达到几百/几千量级Topic可以达到几百量级,如果更多的话,吞吐量会大幅度下降
时效性ms级微秒级别,延迟最低ms级ms级
可用性高,基于主从架构实现高可用高,基于主从架构实现高可用非常高,分布式架构非常高,分布式架构
消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到0丢失经过参数优化配置,可以做到0丢失
功能支持MQ领域的功能极其完备并发能力强,性能极好,延时很低MQ功能较为完善,分布式,扩展性好功能较为简单,支持简单的MQ功能,在大数据领域被广泛使用
其他很早的软件,社区不是很活跃开源,稳定,社区活跃度高阿里开发,社区活跃度不高开源,高吞吐量,社区活跃度极高

1.5 ZooKeeper

ZooKeeper是一个开放源码的分布式应用程序协调服务软件。在当前的Web软件开发中,多节点分布式的架构设计已经成为必然,那么如何保证架构中不同的节点所运行的环境,系统配置是相同的,就是一个非常重要的话题。一般情况下,我们会采用独立的第三方软件保存分布式系统中的全局环境信息以及系统配置信息,这样系统中的每一个节点在运行时就可以从第三方软件中获取一致的数据。也就是说通过这个第三方软件来协调分布式各个节点之间的环境以及配置信息。Kafka软件是一个分布式事件流处理平台系统,底层采用分布式的架构设计,就是说,也存在多个服务节点,多个节点之间Kafka就是采用ZooKeeper来实现协调调度的。"

ZooKeeper的核心作用:

l ZooKeeper的数据存储结构可以简单地理解为一个Tree结构,而Tree结构上的每一个节点可以用于存储数据,所以一般情况下,我们可以将分布式系统的元数据(环境信息以及系统配置信息)保存在ZooKeeper节点中。

l ZooKeeper创建数据节点时,会根据业务场景创建临时节点或永久(持久)节点。永久节点就是无论客户端是否连接上ZooKeeper都一直存在的节点,而临时节点指的是客户端连接时创建,断开连接后删除的节点。同时,ZooKeeper也提供了Watch(监控)机制用于监控节点的变化,然后通知对应的客户端进行相应的变化。Kafka软件中就内置了ZooKeeper的客户端,用于进行ZooKeeper的连接和通信。

2. Java API

pom依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version></dependency>
</dependencies>

KafkaProducerTest 生产者

package com.atguigu.kafka.test.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;/*** @author WangTingWei* @Date 2025/3/2 14:13* @description*/
public class KafkaProducerTest {public static void main(String[] args) {// TODO 配置属性集合HashMap<String, Object> configMap = new HashMap<>();// TODO 配置属性:Kafka 服务器集群地址configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");// TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对k,v进行对应的序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// TODO 创建Kafka生产者对象,建立Kafka连接// 构造对象时,需要传递配置产参数KafkaProducer<String,String> producer = new KafkaProducer<String, String>(configMap);// TODO 准备数据,定义泛型for (int i = 0; i < 10; i++) {// 构造对象时需要传递【Topic主题名称】,【Key】,【Value】三个参数ProducerRecord<String, String> record = new ProducerRecord<>("test", "key"+i, "value"+i);// TODO 生产(发送)数据producer.send(record);}// TODO 关闭生产者连接producer.close();}
}

KafkaConsumerTest 消费者

package com.atguigu.kafka.test.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;/*** @author WangTingWei* @Date 2025/3/2 14:31* @description*/
public class KafkaConsumerTest {public static void main(String[] args) {//  TODO 配置属性集合HashMap<String, Object> configMap = new HashMap<>();// TODO 配置属性:Kafka 集群地址configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");// TODO 配置属性: Kafka 传输的数据为Kv对,所以需要对获取的数据发分别进行序列化反序列化configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// TODO 配置属性:读取数据的位置,取值为 earliest(最早),latest(最晚)configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");// TODO 配置属性:消费者组configMap.put("group.id","atguigu");// TODO 配置属性:自动提交偏移量configMap.put("enable.auto.commit","true");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configMap);// TODO 消费者订阅指定主题的数据consumer.subscribe(Collections.singletonList("test"));while (true){// TODO 每隔100毫秒,抓取一次数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// TODO 打印抓取的数据for (ConsumerRecord<String, String> record : records) {System.out.println("k = "+record.key()+", V = "+record.value());}}}
}

3. Kafka Tool

在这里插入图片描述

点击左上角按钮File -> Add New Connection…建立连接
在这里插入图片描述

新增 Topic
在这里插入图片描述

新增 Message
在这里插入图片描述
查看

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com