您的位置:首页 > 汽车 > 新车 > 杭州网站建设_免费ppt模板免费_360优化大师官方下载手机_免费微信引流推广的方法

杭州网站建设_免费ppt模板免费_360优化大师官方下载手机_免费微信引流推广的方法

2025/1/10 14:56:35 来源:https://blog.csdn.net/2401_82540083/article/details/144801562  浏览:    关键词:杭州网站建设_免费ppt模板免费_360优化大师官方下载手机_免费微信引流推广的方法
杭州网站建设_免费ppt模板免费_360优化大师官方下载手机_免费微信引流推广的方法

目录

1. 单机搭建

2. 测试RocketMQ

3. 集群搭建

4. 集群启动 

5. RocketMQ-DashBoard搭建

6. 不同类型消息发送

1.同步消息

2. 异步消息发送

3. 单向发送消息

7. 消费消息


1. 单机搭建

1. 先从rocketmq官网下载二进制包,ftp上传至linux服务器,unzip命令解压。

2. 启动NameServer

# 1.启动NameServer
nohup sh bin/mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log

3. 启动Broker

RocketMQ默认的虚拟机内存较大,启动Broker如果因为内存不足失败,需要编辑如下两个配置文件,修改JVM内存大小。

# 编辑runbroker.sh和runserver.sh修改默认JVM大小
vi runbroker.sh
vi runserver.sh
  • 参考设置:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

# 1.启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log 

2. 测试RocketMQ

1. 发送消息

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

2. 接收消息

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

3. 关闭RocketMQ

# 1.关闭NameServer
sh mqshutdown namesrv
# 2.关闭Broker
sh mqshutdown broker

3. 集群搭建

这里采用broker双主双从(同步模式),NameServer、Producer、Producer集群由于无状态性,搭建简单。(Master和Slave的brokerName相同,brokerId不同)

1. 服务器环境

序号IP角色架构模式
1192.168.183.132nameserver、brokerserverMaster1、Slave2
2192.168.183.128nameserver、brokerserverMaster2、Slave1

2. hosts配置(两台一样配置)

vi /etc/hosts

# 重启网卡
systemctl restart network

3. 防火墙关闭

# 关闭防火墙
systemctl stop firewalld.service 
# 查看防火墙的状态
firewall-cmd --state 
# 禁止firewall开机启动
systemctl disable firewalld.service

4. 创建消息存储路径

mkdir -p /root/store/master1
mkdir -p /root/store/master1/commitlog
mkdir -p /root/store/master1/consumequeue
mkdir -p /root/store/master1/indexmkdir -p /root/store/slave2
mkdir -p /root/store/slave2/commitlog
mkdir -p /root/store/slave2/consumequeue
mkdir -p /root/store/slave2/indexmkdir -p /root/store/master2
mkdir -p /root/store/master2/commitlog
mkdir -p /root/store/master2/consumequeue
mkdir -p /root/store/master2/indexmkdir -p /root/store/slave1
mkdir -p /root/store/slave1/commitlog
mkdir -p /root/store/slave1/consumequeue
mkdir -p /root/store/slave1/index

5. broker配置文件

1)master1

服务器:192.168.183.132

vi /root/rocketmq4.4/conf/2m-2s-sync/broker-a.properties
# 所属集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意:不同的broker名字必须不同
brokerName=broker-a
# brokerId, brokerId=0表示Master,大于0表示Slave
brokerId=0
brokerIP1=rocketmq-master1
# nameserver地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defalutTopicQueueNums=4
# 是否允许自动创建Topic,建议线下环境开启,线上环境关闭
autoCreateTopicEnable=true
# 是否允许自动创建订阅组,建议线下环境开启,线上环境关闭
autoCreateSubscriptionGroup=true
# Broker 对外暴露端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时(测试环境建议设置120分钟)
fileReservedTime=120
# commitlog文件大小,默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio=88#存储路径
storePathRootDir=/root/store/master1
#commitLog 存储路径
storePathCommitLog=/root/store/master1/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/store/master1/consumequeue
#消息索引存储路径
storePathIndex=/root/store/master1/index
#checkpoint 文件存储路径
storeCheckpoint=/root/store/master1/checkpoint
#abort 文件存储路径
abortFile=/root/store/master1/abort# 限制消息的大小
maxMessageSize=65536# Broker角色
brokerRole=SYNC_MASTER
# 刷盘方式
flushDiskType=SYNC_FLUSH

2)slave2

服务器:192.168.183.132

vi /root/rocketmq4.4/conf/2m-2s-sync/broker-b-s.properties
# 所属集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意:不同的broker名字必须不同
brokerName=broker-b
# brokerId, brokerId=0表示Master,大于0表示Slave
brokerId=1
brokerIP1=rocketmq-slave2
# nameserver地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defalutTopicQueueNums=4
# 是否允许自动创建Topic,建议线下环境开启,线上环境关闭
autoCreateTopicEnable=true
# 是否允许自动创建订阅组,建议线下环境开启,线上环境关闭
autoCreateSubscriptionGroup=true
# Broker 对外暴露端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时(测试环境建议设置120分钟)
fileReservedTime=120
# commitlog文件大小,默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio=88#存储路径
storePathRootDir=/root/store/slave2
#commitLog 存储路径
storePathCommitLog=/root/store/slave2/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/store/slave2/consumequeue
#消息索引存储路径
storePathIndex=/root/store/slave2/index
#checkpoint 文件存储路径
storeCheckpoint=/root/store/slave2/checkpoint
#abort 文件存储路径
abortFile=/root/store/slave2/abort# 限制消息的大小
maxMessageSize=65536# Broker角色
brokerRole=SLAVE
# 刷盘方式
flushDiskType=ASYNC_FLUSH

3)master2

服务器:192.168.183.128

vi /root/rocketmq4.4/conf/2m-2s-sync/broker-b.properties
# 所属集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意:不同的broker名字必须不同
brokerName=broker-b
# brokerId, brokerId=0表示Master,大于0表示Slave
brokerId=0
brokerIP1=rocketmq-master2
# nameserver地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defalutTopicQueueNums=4
# 是否允许自动创建Topic,建议线下环境开启,线上环境关闭
autoCreateTopicEnable=true
# 是否允许自动创建订阅组,建议线下环境开启,线上环境关闭
autoCreateSubscriptionGroup=true
# Broker 对外暴露端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时(测试环境建议设置120分钟)
fileReservedTime=120
# commitlog文件大小,默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio=88#存储路径
storePathRootDir=/root/store/master2
#commitLog 存储路径
storePathCommitLog=/root/store/master2/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/store/master2/consumequeue
#消息索引存储路径
storePathIndex=/root/store/master2/index
#checkpoint 文件存储路径
storeCheckpoint=/root/store/master2/checkpoint
#abort 文件存储路径
abortFile=/root/store/master2/abort# 限制消息的大小
maxMessageSize=65536# Broker角色
brokerRole=SYNC_MASTER
# 刷盘方式
flushDiskType=SYNC_FLUSH

4)slave1

服务器:192.168.183.128

vi /root/rocketmq4.4/conf/2m-2s-sync/broker-a-s.properties
# 所属集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意:不同的broker名字必须不同
brokerName=broker-a
# brokerId, brokerId=0表示Master,大于0表示Slave
brokerId=1
brokerIP1=rocketmq-slave1
# nameserver地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defalutTopicQueueNums=4
# 是否允许自动创建Topic,建议线下环境开启,线上环境关闭
autoCreateTopicEnable=true
# 是否允许自动创建订阅组,建议线下环境开启,线上环境关闭
autoCreateSubscriptionGroup=true
# Broker 对外暴露端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时(测试环境建议设置120分钟)
fileReservedTime=120
# commitlog文件大小,默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio=88#存储路径
storePathRootDir=/root/store/slave1
#commitLog 存储路径
storePathCommitLog=/root/store/slave1/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/store/slave1/consumequeue
#消息索引存储路径
storePathIndex=/root/store/slave1/index
#checkpoint 文件存储路径
storeCheckpoint=/root/store/slave1/checkpoint
#abort 文件存储路径
abortFile=/root/store/slave1/abort# 限制消息的大小
maxMessageSize=65536# Broker角色
brokerRole=SLAVE
# 刷盘方式
flushDiskType=ASYNC_FLUSH

4. 集群启动 

1)启动NameServe集群

分别在192.168.183.132和192.168.183.128启动NameServer

nohup sh mqnamesrv &

启动成功。

2)启动broker集群

  • 在192.168.183.132上启动master1和slave2
# master1
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-a.properties &
# slave2
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-b-s.properties &

启动成功。

  • 在192.168.183.128上启动master2和slave1
# slave1
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-a-s.properties &
# master2
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-b.properties &

3)日志查看

# 查看nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log

5. RocketMQ-DashBoard搭建

github上拉取项目后,修改yml的namesrvAddrs即可。

6. 不同类型消息发送

pom.xml

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency>

1.同步消息

SyncProducer.java 

/*** 发送同步消息*/
public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("group1");// 设置NameServer地址producer.setNamesrvAddr("192.168.183.132:9876;192.168.183.128:9876");// 启动Producer实例producer.start();for (int i = 0; i < 10; i++) {// 创建消息,并指定Topic,Tag,消息体Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息到brokerSendResult sendResult = producer.send(msg);// 发送状态SendStatus status = sendResult.getSendStatus();// 消息IDString msgId = sendResult.getMsgId();// 消息接收队列IDint queueId = sendResult.getMessageQueue().getQueueId();System.out.println("发送状态: " + status + " 消息ID: " + msgId + " 消息接收队列ID: " + queueId);TimeUnit.SECONDS.sleep(1);}// 如果不再发送消息,关闭Producer实例producer.shutdown();}
}

2. 异步消息发送

 AsyncProducer1.java


public class AsyncProducer1 {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("group1");// 设置NameServer地址producer.setNamesrvAddr("192.168.183.132:9876;192.168.183.128:9876");// 启动Producer实例producer.start();// 设置重试次数producer.setRetryTimesWhenSendAsyncFailed(0);for (int i = 0; i < 10; i++) {// 创建消息,并指定Topic,Tag,消息体Message msg = new Message("TopicTest","TagB",("Hello World " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback 接收异步返回结果的回调int finalI = i;producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送结果:" + sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.println("发送异常:" + throwable);}});TimeUnit.SECONDS.sleep(1);}producer.shutdown();}
}

3. 单向发送消息

不关心结果,比如日志发送

public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("192.168.183.132:9876;192.168.183.128:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送单向消息,没有任何返回结果producer.sendOneway(msg);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

7. 消费消息


public class Consumer1 {public static void main(String[] args) throws MQClientException {// 实例化消费者,指定组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");// 指定NameServer地址信息consumer.setNamesrvAddr("192.168.183.132:9876;192.168.183.128:9876");// 订阅Topicconsumer.subscribe("TopicTest", "*");// 负载均衡模式消费consumer.setMessageModel(MessageModel.CLUSTERING);   // 广播模式 MessageModel.BROADCASTING// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.println("Consumer Started.");}
}

负载均衡模式消费:多个消费者共同处理broker消息队列的消息。

广播模式消费:每个消费者都会收到订阅的Topic的消息。

持续更新中......

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com