Kafka是一种高吞吐量的分布式发布订阅消息系统。
1 Kafka
Kafka具有以下特性:
- 分布式系统:可以部署在多个节点上,实现负载均衡和容错性。
- 发布/订阅模式:允许消息的生产者和消费者解耦,提高系统的可扩展性和灵活性。
- 高吞吐量:可以处理大量数据流,满足实时数据处理的需求。
- 持久性:将消息持久化到磁盘上,保证了数据的可靠性和稳定性。
- 顺序性:可以保证在同一个分区内的消息的顺序性(不同分区的不能保证)。
应用场景:
1)日志收集与聚合:收集来自各个应用程序的日志并将其发送到中央存储和处理系统。
2)消息队列:用于异步通信,解耦生产者和消费者,处理大量消息。
1.1 基本概念
Zookeeper | 是一个分布式协调服务。Kafka用它管理和维护集群状态,例如Broker的加入、退出及Leader选举等。 |
Broker | 服务代理节点或Kafka服务实例。大多数情况下可以将Broker看作一台Kafka服务器(前提是这台服务器只部署了一个Kafka实例)。一个或多个Broker组成了一个Kafka集群。 |
Producer | 生产者,发送消息的一方。负责创建消息,然后将其投递到Kafka中。 |
Consumer | 消费者,接收消息的一方。消费者连接到Kafka上并接收消息,进而进行相应的逻辑处理。 |
Topic | 主题,将消息进行归类的单位。Kafka中,每条消息都要指定一个主题。生产者负责将消息发送到特定主题,而消费者负责订阅主题并进行消费。 |
Partition | 分区,是主题的子集。一个主题可以被分割成多个分区。分区允许将数据分布在多个服务器(Broker)上。每个分区在物理上一个有序、不可变的消息序列,存储为一系列连续的、有序的日志文件。同一个主题下的不同分区包含的消息是不同的。 |
表 Kafka的基本概念
1.1.1 多副本机制
Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量来提升容灾能力。同一个分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样)。副本之间是“一主多从”的关系。其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。
副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。
AR | Assigned Replicas,分区中的所有副本。 |
ISR | In-Sync Replicas,所有与leader副本保持一定程度同步的副本(包括leader)。是AR的一个子集。 只有在ISR集合中的副本才有资格被选举为新的leader。 |
OSR | Out-of-Sync Replicas,与leader副本同步滞后过多的副本。 AR = ISR + OSR。 |
图 分区中的副本
1.1.2 消费端的容灾能力
消费端使用拉(Pull)模式从服务端拉起消息,并保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费。
1.1.3 HW和LEO
HW,High Watermark,高水位。标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。
LEO,Log End Offset,标识当前日志文中下一条待写入消息的offset。ISR集合中最小的LEO即为分区的HW。
1.2 快速搭建Kafka服务
版本号2.11,从官网下载。然后将kafka_2.11.tar解压即可。zookeeper的默认端口为2181。可以在config/zookeeper.properties 文件中指定端口。kafka的默认端口为9092,在config/server.properties 文件中指定端口。
1.启动Zookeeper。
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
-daemon 表示以守护进程形式在后台启动该进程。
2.启动Kafka。
bin/kafka-server-start.sh -daemon config/server.properties
3.创建主题topic-demo1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-demo1
4.查看集群下的主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
5.启动生产者客户端
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo1
图 启动生产者客户端并发送消息
6.启动消费者客户端
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo1 --from-beginning
图 启动消费者客户端并接收消息
1.2.1 Zookeeper 服务
1.连接Zookeeper 服务
bin/zookeeper-shell.sh localhost:2181
2.列出/brokers/topics下的节点信息
ls /brokers/topics
图 连接Zookeeper服务
1.3 搭建Kafka集群
1.3.1 在同一台服务器上部署三个broker
1.复制server.properties 文件。
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
2.修改配置文件
config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dirs=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dirs=/tmp/kafka-logs-
3.启动broker
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties
4.创建主题mult-topic1,其拥有1个包含3个副本的分区。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic mult-topic1
5.查看mult-topic1 主题的详细信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mult-topic1
图 mult-topic1 的详细信息