您的位置:首页 > 教育 > 培训 > zookeeper集群+kafka集群

zookeeper集群+kafka集群

2024/10/6 14:36:20 来源:https://blog.csdn.net/qq_52448810/article/details/140998373  浏览:    关键词:zookeeper集群+kafka集群

zookeeper集群+kafka集群

zookeeper是一个开源的,分布式的,为分布式架构提供协调服务的APACHE的项目。
保存元数据。数据的走向,发送路径不依赖zookeeper。

zookeeper工作机制:

观察者模式设计的分布式服务器管理架构。
负责存储和管理元数据,记录集群的变化。保存集群变化的信息。

zookeeper的特点:
1、在集群中分为领导者和追随者,组成的集群。
2、只要有半数以上的节点正常工作,整个zookeeper就可以正常工作。(zookeeper在部署时一般奇数台)
3、全局的数据一致。每个zookeeper不论是领导者还是追随者,在访问他们数据时都是一致的。
4、数据更新的原子性,一次更新数据,要么都成功,要么都失败
5、数据更新的实时性能。
6、领导者和追随者根据投票产生

选举机制:

A B C
1、服务器A 启动 发起一次选举,A会投自己一票。A有一票,不够半数。选举无法完成。A进入looking1状态
2、服务器B 启动 再发起一次选举,服务器B也投自己一票,服务器A和服务器B做个比较,myid,谁myid大,
如果A比B小,A会把票改投给B,2票,B自动当选为leader
3、C启动了,自动成为追随者,A也会成为追随者

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

yum install -y java 
java -version
mv apache-zookeeper-3.5.7-bin /opt/zookeeper
cd zookeeper/
cd conf/
ls
cp zoo_sample.cfg   zoo.cfg
vim zoo.cfg2 tickTime=2000
3 #通信心跳时间,zookeeper服务端和客户端之间通信的时间,单位毫秒
6 initLimit=10
7 #leader和follower初始连接时,最多能容忍的心跳数。秒
10 syncLimit=5
11 #leader和follower之间同步通信的超时时间。如果5*2的时间,发生同步超时,leader就认为fo    llower死了,会把他从集群当中删除19 clientPort=218120 #服务端口
 15 dataDir=/opt/zookeeper/data16 #数据保存目录17 dataLogDir=/opt/zookeeper/logs21 server.1=192.168.11.144:3188:3288
192.168.11.144服务器的ip地址
3188:zookeeper集群内部通信的端口
3288:重新选举端口,万一leader挂了,用这个端口进行内部通信,选举新的leader22 server.2=192.168.11.145:3188:328823 server.3=192.168.11.146:3188:3288mkdir /opt/zookeeper/data
mkdir /opt/zookeeper/logs
vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/opt/zookeeper'
case $1 in
start)echo "---------- zookeeper 启动 ------------"$ZK_HOME/bin/zkServer.sh start
;;
stop)echo "---------- zookeeper 停止 ------------"$ZK_HOME/bin/zkServer.sh stop
;;
restart)echo "---------- zookeeper 重启 ------------"$ZK_HOME/bin/zkServer.sh restart
;;
status)echo "---------- zookeeper 状态 -----
chmod +x /etc/init.d/zookeeper 
chkconfig -add zookeeper
chkconfig --add zookeeper
echo 1 > /opt/zookeeper/data/myid    #三台设备每台对应数值不一样对一台对应1,第二台2,第三台3
service zookeeper start
service zookeeper status

kafka概述

消息队列:MQ
在高并发额环境下,同步的请求来不及处理,请求太多会造成阻塞。
比如大量请求并发到数据库,too many connection报错。
消息队列,使用异步处理方式,可以缓解系统处理请求的压力。

kafka作用:

1、异步处理
2、系统解耦
每个系统之间独立运行,互相之间没有必然的依赖关系。
微服务架构中,通信对于解耦来说至关重要
各个微服务之间独立运行,分别处理各自的请求和消息。提高整个系统的吞吐量和能力。
尤其是电商的订单系统,网站的工单系统,典型的一个消息队列场景。
3、负载均衡
消息队列的负载均衡:把任务发送到多个消费者,多个消费者可以并行处理队列中的消息。
4、流量控制和限流
通过延迟方法,处理生产速率和消费者的处理速度(代码控制)
5、数据同步和分发
跨系统的数据同步和日志收集
6、任务调度和定时任务
7、实时数据处理
8、备份和恢复

消息队列的模式:

1、点对点(已淘汰) 一对一 消费者消费完数据之后,生产者会自动清除已经消费的数据
一个生产者对应一个消费者
2、发布/订阅模式:(一对多,观察者模式,消费者数据在消费完之后不会清除(保留一段时间))
生产者发布一个消息,可以是一个消费者使用,也可以是多个消费者同时使用。(主流)

kafka就是发布/订阅模式的消息队列
主要用于大数据实时处理领域 RabbitMQ也是发布/订阅模式的消息队列(小集群内部使用)。

kafka的特性:

1、高吞吐,低延迟
每秒可以处理几十万条数据,延迟只有几毫米
2、集群的可扩展性(热扩展)
可以吧消息的持久化:生产者发布的消息可以保存到磁盘当中,防止数据丢失(有时间限制)
3、容错性
挂了一个可以继续使用
4、高并发
数千个客户端可以同时读写

kafka组件(面试)

topic 主题 kafka的基本单元,所有生产者发布的消息都是发到主题。
消费者订阅主题,然后消息生产者发布的消息。
生产者 生产者把消息发布到主题
消费者 订阅主题,消费生产者发布的消息。

**偏移量:**消息在分区当中的唯一标识,跟踪和定位消息所在的位置。消费者可以根据偏移量来处理信息。

分区 每个主题都可以分成多个分区,每个分区都是数据的有序子集。
分区当中,保留数据,按照偏移量来有序的存储数据。消费可以根据偏移量来消费指定的分区当中的消息(一般不用)
有分区必须有偏移量
分区还有备份的作用:创建主题的时候创建分区,创建分区时要指定副本数。
分区和我们指定的集群机器数量一般保持一致的。
副本:备份分区的消息,最少要两个,互为本分

经纪人broker 经纪人处理生产者和消费者的请求(kafka)元数据(zookeeper)
zookeeper:保存元数据(ip地址、集群信息)

kafka工作流程(面试):

生产者将消息发布到指定的主题,每个消息都附带一个key和value。
主题有多个分区,生产者将消息写入一个分区(带偏移量)
经纪人(kafka):负责分配和处理生产者的发布请求,偏移量也是经纪人分配(在分区当中是唯一的)。
消费者订阅主题,获取全量的消费者的消费信息(默认模式),也可以从执行的分区获取消息(代码来完成,一般不使用)
生产者发布的消息会在本地保留一段时间,防止消费者有延迟或者处理慢,导致没有成功消费。(默认保留:7天)

在这里插入图片描述

192.168.11.144   kafka    主
192.168.11.145   kafka    客户端1
192.168.11.146   kafka    客户端2主是生产者,客户端1和客户端2是消费者192.168.11.144 
-----------------------------------------------kafka主配置----------------------------------------------------------
[root@myslq4 opt]# tar -xf kafka_2.13-3.4.1.tgz 
[root@myslq4 opt]# mv kafka_2.13-3.4.1 /usr/local/kafka
[root@myslq4 opt]# cd /usr/local/kafka/config/
[root@myslq3 config]# vim server.properties
24 broker.id=0      每台主机必须不一样
34 listeners=PLAINTEXT://192.168.11.144:9092
44 num.network.threads=3
#broker处理网络请求的线程数,一般不动
47 num.io.threads=8
#处理磁盘读写的线程数,数制一定大于磁盘数量
51 socket.send.buffer.bytes=102400
#发送套接字的缓冲区的大小
54 socket.receive.buffer.bytes=102400#接收套接字缓冲区的大小
57 socket.request.max.bytes=104857600#请求套接字缓冲区的大小
63 log.dirs=/usr/local/kafka/logs
64 #日志存放路径
68 num.partitions=1
69 #创建主题时,经纪人指定的分区数,如果指定的分区数不同,这个值可以覆盖
106 log.retention.hours=168
107 #消息的本地持久化保留的时间,168小时
126 zookeeper.connect=192.168.11.144:2181,192.168.11.145:2181,192.168.11.146:2181[root@myslq3 config]# vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@myslq3 config]# source /etc/profile
[root@myslq3 config]# vim /etc/init.d/kafka 
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac[root@myslq3 config]# service kafka start
---------- Kafka 启动 ------------/usr/local/kafka/bin/kafka-topics.sh/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.11.144:9092,192.168.11.145:9092,192.168.11.146:9092 --replication-factor 2 --partitions 3 --topic test9
创建主题命令
--replication-factor 2:创建分区的副本数,最少2个
--partition 3:分区数
--topic test9:主题名称kafka-console-producer.sh --broker-list 192.168.11.144:9092,192.168.11.145:9092,192.168.11.146:9092 --topic test9
生产者生产消息命令kafka-console-consumer.sh --bootstrap-server 192.168.11.144:9092,192.168.11.145:9092,192.168.11.146:9092 --topic test15 --from-beginning    
消费者消费生产者生产的消息kafka-topics.sh --describe --bootstrap-server 192.168.11.144:9092,192.168.11.145:9092,192.168.11.146:9092
#查询集群中的主题详细信息kafka-topics.sh --list --bootstrap-server 192.168.11.144:9092,192.168.11.145:9092,192.168.11.146:9092
#列出Kafka集群中当前存在的所有主题service zookeeper status   查看状态
192.168.11.145
-----------------------------------------------kafka客户机1---------------------------------------------------------
[root@myslq4 opt]# tar -xf kafka_2.13-3.4.1.tgz 
[root@myslq4 opt]# mv kafka_2.13-3.4.1 /usr/local/kafka
[root@myslq4 opt]# cd /usr/local/kafka/config/
[root@myslq3 config]# vim server.properties
24 broker.id=1    
#每台主机必须不一样
34 listeners=PLAINTEXT://192.168.11.145:9092
44 num.network.threads=3
#broker处理网络请求的线程数,一般不动
47 num.io.threads=8
#处理磁盘读写的线程数,数制一定大于磁盘数量
51 socket.send.buffer.bytes=102400
#发送套接字的缓冲区的大小
54 socket.receive.buffer.bytes=102400#接收套接字缓冲区的大小
57 socket.request.max.bytes=104857600#请求套接字缓冲区的大小
63 log.dirs=/usr/local/kafka/logs
64 #日志存放路径
68 num.partitions=1
69 #创建主题时,经纪人指定的分区数,如果指定的分区数不同,这个值可以覆盖
106 log.retention.hours=168
107 #消息的本地持久化保留的时间,168小时
126 zookeeper.connect=192.168.11.144:2181,192.168.11.145:2181,192.168.11.146:2181[root@myslq3 config]# vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@myslq3 config]#source /etc/profile
[root@myslq3 config]# vim /etc/init.d/kafka 
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac[root@myslq3 config]# service kafka start
---------- Kafka 启动 ------------kafka-topics.sh --describe --bootstrap-server 192.168.11.144:9092,192.168.11.145:9092,192.168.11.146:9092
#查询集群中的主题详细信息kafka-topics.sh --list --bootstrap-server 192.168.11.144:9092,192.168.11.145:9092,192.168.11.146:9092
#列出Kafka集群中当前存在的所有主题service zookeeper status   查看状态
192.168.11.146
-----------------------------------------------kafka客户机2---------------------------------------------------------
[root@myslq4 opt]# tar -xf kafka_2.13-3.4.1.tgz 
[root@myslq4 opt]# mv kafka_2.13-3.4.1 /usr/local/kafka
[root@myslq4 opt]# cd /usr/local/kafka/config/
[root@myslq3 config]# vim server.properties
24 broker.id=2      每台主机必须不一样
34 listeners=PLAINTEXT://192.168.11.146:9092
44 num.network.threads=3
#broker处理网络请求的线程数,一般不动
47 num.io.threads=8
#处理磁盘读写的线程数,数制一定大于磁盘数量
51 socket.send.buffer.bytes=102400
#发送套接字的缓冲区的大小
54 socket.receive.buffer.bytes=102400#接收套接字缓冲区的大小
57 socket.request.max.bytes=104857600#请求套接字缓冲区的大小
63 log.dirs=/usr/local/kafka/logs
64 #日志存放路径
68 num.partitions=1
69 #创建主题时,经纪人指定的分区数,如果指定的分区数不同,这个值可以覆盖
106 log.retention.hours=168
107 #消息的本地持久化保留的时间,168小时
126 zookeeper.connect=192.168.11.144:2181,192.168.11.145:2181,192.168.11.146:2181[root@myslq3 config]# vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@myslq3 config]#source /etc/profile
[root@myslq3 config]# vim /etc/init.d/kafka 
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac[root@myslq3 config]# service kafka start
---------- Kafka 启动 ------------kafka-topics.sh --describe --bootstrap-server 192.168.11.144:9092,192.168.11.145:9092,192.168.11.146:9092
#查询集群中的主题详细信息kafka-topics.sh --list --bootstrap-server 192.168.11.144:9092,192.168.11.145:9092,192.168.11.146:9092
#列出Kafka集群中当前存在的所有主题service zookeeper status   查看状态

总结:
zookeeper 就是保存集群的元数据
kafka 工作流程:
组件的作用 消费者、生产者、
kafka消息堆积如何解决?
主要原因:主要在于消费者出现延迟或者处理能力太差,导致消息堆积
解决办法:1、减少kafka持久化保存时间
2、修改主题的分区数,扩大分区数量,提高消费者获取的通道
3、可以指定多个消费者共同工作,处理消息的积压。

#查询集群中的主题详细信息

kafka-topics.sh --list --bootstrap-server 192.168.11.144:9092,192.168.11.145:9092,192.168.11.146:9092
#列出Kafka集群中当前存在的所有主题

service zookeeper status 查看状态


总结:
zookeeper 就是保存集群的元数据
**kafka 工作流程:**
**组件的作用**    消费者、生产者、
kafka消息堆积如何解决?
主要原因:主要在于消费者出现延迟或者处理能力太差,导致消息堆积
解决办法:1、减少kafka持久化保存时间2、修改主题的分区数,扩大分区数量,提高消费者获取的通道 3、可以指定多个消费者共同工作,处理消息的积压。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com