最近在研究rocketmq5.x的运行机制,研究到高可用章节,看到rocketMq采用了主从机制实现高可用,将broker分成了master和slave。为了更好的理解主从源码,我觉着需要先搭建一个主从的集群,先了解主从集群是怎么使用的。
这篇文章,写完之后,我校了好几遍稿,也按照这个文档重新搭建了一遍,所以如果你的rocketmq相关目录和我是一致的,跟着文档搭建,最多1个小时内就能搞定,因为只需要你改动一下相关配置文件中的ip就行。
文章目录
- 1、5.x架构图
- 2、准备服务器
- 3、服务器安全组配置
- 4、下载rocketmq-5.2.0
- 5、配置环境变量
- 6、配置hosts
- 7、主从配置文件修改
- 7.1、配置文件说明
- 7.2、手动创建目录
- 7.3、master1配置
- 7.4、slave2配置文件
- 7.5、master2配置文件
- 7.6、slave1配置文件
- 8、proxy配置文件
- 8.1、47.xxx.xxx.xxx配置
- 8.2、39.xxx.xxx.xxx配置
- 9、JVM参数修改
- 10、启动脚本
- 10.1、47.xxx.xxx.xxx启动脚本
- 10.2、39.xxx.xxx.xxx启动脚本
- 11、集群验证
- 12、发送消息、消费消息验证
- 12.1、创建topic
- 12.2、rocketmq5.x的maven依赖
- 12.3、生产者
- 12.4、消费者
- 13、dashboard监控
- 13.1、下载dashboard
- 13.2、修改dashboard配置
- 13.3、启动dashboard
1、5.x架构图
这是我在网上找到的一张5.x的架构图,整体上先了解一下5.x的架构。
注意这张图是一主两从,不是我们要搭建的双主双从。
解释一下这张图。
5.x和4.x最大的不同就是5.x引入了一个proxy组件,引入这个组件的目的就是为了存算分离,更好的拥抱云原生
客户端不再连接namesrv,而是和proxy连接,proxy和namesrv交互获取broker的路由信息,和broker交互
2、准备服务器
我一共准备了两台阿里云的服务器。
一台机器的ip是47开头的,47.xxx.xxx.xxx
一台机器的ip是39开头的,39.xxx.xxx.xxx
两台服务器扮演的角色如下:
机器 | 角色 |
---|---|
47.xxx.xxx.xxx | master1,slave2,namesrv1,proxy1 |
39.xxx.xxx.xxx | master2,slave1,namesrv2,proxy2 |
master1,slave2,意思是master1节点和master2的slave节点部署在一起机器上,这是为了保证高可用,主从节点不能部署到一台机器上。
3、服务器安全组配置
服务器安全组,就是开通rocketmq相关的访问端口
需要开通的端口及作用如下
端口 | 作用 |
---|---|
9876 | namesrv端口 |
11011 | 主从同步端口 |
10911 | broker监听的端口 |
10909、11009 | rocketmq-dashboard连接的端口 |
28081 | grpcRemote协议端口,客户端通过该端口连接proxy |
两台机器都要开通上述端口
4、下载rocketmq-5.2.0
直接到官网下载就行了,官网下载地址
下载到本地后,把目录改短一点,我文章中改成了rocketmq-5.2.0
5、配置环境变量
两台机器的配置是一样的。
rocketmq的安装路径,建议和我的路径一致。要不然要改好多地方
ROCKETMQ_HOME=/opt/application/rocketmq-5.2.0
PATH=$PATH:$ROCKETMQ_HOME/bin
6、配置hosts
两台机器的配置是一样的
改动点:把xxx.xxx.xxx的地方换成你的ip
# namesrv域名地址
47.xxx.xxx.xxx rocketmq-nameserver1
39.xxx.xxx.xxx rocketmq-nameserver2# broker域名配置
47.xxx.xxx.xxx rocketmq-master1
47.xxx.xxx.xxx rocketmq-slave2
39.xxx.xxx.xxx rocketmq-master2
39.xxx.xxx.xxx rocketmq-slave1
7、主从配置文件修改
7.1、配置文件说明
rocketmq主从配置分为同步模式和异步模式,我们这次搭建的是同步模式。
配置文件的路径是rocketmq-5.2.0/conf/2m-2s-sync,这些配置文件是官方给准备好的。
这个目录下一共有4个配置文件。
#master1从机配置
broker-a-s.properties
#master1配置
broker-a.properties
#master2从机配置
broker-b-s.properties
#master2配置
broker-b.properties
这里顺便提一下,如果你电脑用的是sublime编辑器,我推荐你装一个sftp的插件,然后连接上2台服务器,这样修改配置文件要方便很多。
7.2、手动创建目录
主从配置的存储目录需要我们手动创建,创建命令如下
mkdir -p /usr/local/rocketmq/store
mkdir -p /usr/local/rocketmq/store/commitlog
mkdir -p /usr/local/rocketmq/store/consumequeue
mkdir -p /usr/local/rocketmq/store/indexmkdir -p /usr/local/rocketmq/store_s
mkdir -p /usr/local/rocketmq/store_s/commitlog
mkdir -p /usr/local/rocketmq/store_s/consumequeue
mkdir -p /usr/local/rocketmq/store_s/index
两台机器都要执行
7.3、master1配置
改动点:把xxx.xxx.xxx的地方换成你的ip,其他不用改动
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样,brokerName相同的机器为一对主从
brokerName=broker-a
# 0 表示 Master,>0 表示 Slave
brokerId=0
#所在机器的ip地址
brokerIP1=47.xxx.xxx.xxx
# nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
7.4、slave2配置文件
改动点:把xxx.xxx.xxx的地方换成你的ip,其他不用改动
# 配置信息如下:
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0 表示 Master,>0 表示 Slave
brokerId=1
#所在机器的ip地址
brokerIP1=47.xxx.xxx.xxx
# nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store_s
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store_s/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store_s/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store_s/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store_s/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store_s/abort
# 限制的消息大小
maxMessageSize=65536
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SLAVE
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
7.5、master2配置文件
改动点:把xxx.xxx.xxx的地方换成你的ip,其他不用改动
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样,brokerName相同的机器为一对主从
brokerName=broker-b
# 0 表示 Master,>0 表示 Slave
brokerId=0
#所在机器的ip地址
brokerIP1=39.xxx.xxx.xxx
# nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
7.6、slave1配置文件
改动点:把xxx.xxx.xxx的地方换成你的ip,其他不用改动
# 配置信息如下:
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示 Master,>0 表示 Slave
brokerId=1
#所在机器的ip地址
brokerIP1=39.xxx.xxx.xxx
# nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store_s
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store_s/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store_s/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store_s/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store_s/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store_s/abort
# 限制的消息大小
maxMessageSize=65536
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SLAVE
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
8、proxy配置文件
rocketmq-5.2.0多了proxy配置文件。配置文件路径
rocketmq-5.2.0/conf/rmq-proxy.json
8.1、47.xxx.xxx.xxx配置
改动点:把xxx.xxx.xxx的地方换成你的ip,其他不用改动
{"rocketMQClusterName": "rocketmq-cluster","namesrvAddr":"rocketmq-nameserver1:9876;rocketmq-nameserver2:9876","brokerIP1":"47.xxx.xxx.xxx","remotingListenPort":28080,"grpcServerPort":28081
}
28081是grpc协议端口,就是proxy的端口,客户端要连接这个端口,发送消息或者消费消息
8.2、39.xxx.xxx.xxx配置
改动点:把xxx.xxx.xxx的地方换成你的ip,其他不用改动
{"rocketMQClusterName": "rocketmq-cluster","namesrvAddr":"rocketmq-nameserver1:9876;rocketmq-nameserver2:9876","brokerIP1":"39.xxx.xxx.xxx","remotingListenPort":28080,"grpcServerPort":28081
}
9、JVM参数修改
JVM参数修改是改rocketmq的启动脚本。配置文件在rocketmq-5.2.0/bin目录下,一共需要修改2个配置文件
runbroker.sh、runserver.sh
默认情况下,broker的jvm堆大小是8g。
我买的机器,内存只有2g,所以要改小一点,否则mq的相关组件根本起不来。
经过验证,测试使用,128m就足够了。
-server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m
10、启动脚本
因为要启动的角色有点多,所以写一个shell脚本,一键启动所有角色
脚本中引用的log文件需要自己手动创建。两台机器都要创建
创建log文件的命令
mkdir /opt/application/rocketmq-5.2.0/logs/
touch /opt/application/rocketmq-5.2.0/logs/mqnamesrv.log
touch /opt/application/rocketmq-5.2.0/logs/mqbroker.log
touch /opt/application/rocketmq-5.2.0/logs/mqbroker-s.log
touch /opt/application/rocketmq-5.2.0/logs/mqproxy.log
如果你的目录路径和我的不一样,自己改下
10.1、47.xxx.xxx.xxx启动脚本
改动点:把xxx.xxx.xxx的地方换成你的ip,其他不用改动
#!/bin/bash
ROCKETMQ_DIR=/opt/application/rocketmq-5.2.0
CONFIG_DIR=$ROCKETMQ_DIR/conf/2m-2s-sync
NAMESRV_ADDRESSES="47.xxx.xxx.xxx:9876;39.xxx.xxx.xxx:9876"nohup sh $ROCKETMQ_DIR/bin/mqnamesrv > "/opt/application/rocketmq-5.2.0/logs/mqnamesrv.log" 2>&1 &
sleep 5
nohup sh $ROCKETMQ_DIR/bin/mqbroker -c $CONFIG_DIR/broker-a.properties > "/opt/application/rocketmq-5.2.0/logs/mqbroker.log" 2>&1 &
sleep 5
nohup sh $ROCKETMQ_DIR/bin/mqbroker -c $CONFIG_DIR/broker-b-s.properties > "/opt/application/rocketmq-5.2.0/logs/mqbroker-s.log" 2>&1 &
sleep 5
nohup sh "${ROCKETMQ_DIR}/bin/mqproxy" -n "${NAMESRV_ADDRESSES}" -pc "${ROCKETMQ_DIR}/conf/rmq-proxy.json" > "${ROCKETMQ_DIR}/logs/mqproxy.log" 2>&1 &
10.2、39.xxx.xxx.xxx启动脚本
改动点:把xxx.xxx.xxx的地方换成你的ip,其他不用改动
#!/bin/bash
ROCKETMQ_DIR=/opt/application/rocketmq-5.2.0
CONFIG_DIR=$ROCKETMQ_DIR/conf/2m-2s-sync
NAMESRV_ADDRESSES="47.xxx.xxx.xxx:9876;39.xxx.xxx.xxx:9876"nohup sh $ROCKETMQ_DIR/bin/mqnamesrv > "/opt/application/rocketmq-5.2.0/logs/mqnamesrv.log" 2>&1 &
sleep 5
nohup sh $ROCKETMQ_DIR/bin/mqbroker -c $CONFIG_DIR/broker-b.properties > "/opt/application/rocketmq-5.2.0/logs/mqbroker.log" 2>&1 &
sleep 5
nohup sh $ROCKETMQ_DIR/bin/mqbroker -c $CONFIG_DIR/broker-a-s.properties > "/opt/application/rocketmq-5.2.0/logs/mqbroker-s.log" 2>&1 &
sleep 5
nohup sh "${ROCKETMQ_DIR}/bin/mqproxy" -n "${NAMESRV_ADDRESSES}" -pc "${ROCKETMQ_DIR}/conf/rmq-proxy.json" > "${ROCKETMQ_DIR}/logs/mqproxy.log" 2>&1 &
11、集群验证
2台机器,各要启动4个进程,如下
#broker
1425 BrokerStartup
#broker
1595 BrokerStartup
#proxy
1747 ProxyStartup
#namesrv
1359 NamesrvStartup
如果启动起来的进程少了,可以查看相关日志查看启动失败的原因,日志路径如下:
#namesrv日志
rocketmq-5.2.0/logs/mqnamesrv.log
#master日志
rocketmq-5.2.0/logs/mqbroker.log
#slave日志
rocketmq-5.2.0/logs/mqbroker-s.log
#proxy日志
rocketmq-5.2.0/logs/mqproxy.log
12、发送消息、消费消息验证
使用5.2.0的新版本客户端连接一下集群
12.1、创建topic
在此之前,先创建好topic,5.x不会自动创建,不提前创建会报错:
No topic route info in name server for the topic: NormalTopic
我这里创建的topic名称是:NormalTopic
改动点:把xxx.xxx.xxx的地方换成你的ip,其他不用改动
sh bin/mqadmin updatetopic -n "47.xxx.xxx.xxx:9876;39.xxx.xxx.xxx:9876" -t NormalTopic -c rocketmq-cluster
12.2、rocketmq5.x的maven依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.5</version></dependency>
12.3、生产者
改动点:如果你的topic名称也是NormalTopic,那只需要把xxx.xxx.xxx的地方换成你的ip,其他不用改动
public static void main(String[] args) {// 接入点地址,需要设置成 Proxy 的地址和端口列表String endpoint = "47.xxx.xxx.xxx:28081";// 消息发送的目标Topic名称,需要提前创建。String topic = "NormalTopic";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的TopicProducer producer = null;try {producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();} catch (ClientException e) {log.error("构建producer发生异常",e);}// 普通消息发送Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息.setKeys("messageKey")// 设置消息Tag,用于消费端根据指定Tag过滤消息.setTag("messageTag")// 消息内容实体(byte[]).setBody("hello rocketMQ".getBytes()).build();try {// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);log.info("send message successfully, messageId={}", sendReceipt.getMessageId());} catch (ClientException e) {log.error("failed to send message", e);}// 关闭try {producer.close();} catch (IOException e) {log.error("关闭producer,发生IO异常",e);}}
12.4、消费者
改动点:如果你的topic名称也是NormalTopic,那只需要把xxx.xxx.xxx的地方换成你的ip,其他不用改动
public void pushConsumerTest() throws Exception {ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表String endpoint = "47.xxx.xxx.xxx:28081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoint).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建String consumerGroup = "TestGroup";// 指定需要订阅哪个目标Topic,Topic需要提前创建String topic = "NormalTopic";// 初始化 PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者分组.setConsumerGroup(consumerGroup)// 设置预绑定的订阅关系.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器.setMessageListener(messageView -> {// 处理消息并返回消费结果log.info("consume message successfully, messageId={}", messageView.getMessageId());// 消息内容处理ByteBuffer body = messageView.getBody();String message = StandardCharsets.UTF_8.decode(body).toString();body.flip();log.info("message body={}", message);return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);// 如果不需要再使用 PushConsumer,可关闭该实例。pushConsumer.close();}
13、dashboard监控
13.1、下载dashboard
从github下载一个dashboard。下载地址
13.2、修改dashboard配置
修改一下namesrv的地址就行了
#连接一台namesrv就好
rocketmq:config:namesrvAddrs: 39.xxx.xxx.xxx:9876
13.3、启动dashboard
启动后,如果集群搭建的没有问题,可以看到集群的节点,2主2从