RocketMq
rocketmq介绍
RocketMQ 是一款分布式消息中间件,由阿里巴巴开源并捐赠给 Apache 基金会,它借鉴了 Kafka 的设计并进行了很多改进和优化,具有高性能、高可靠、易扩展等特点。RocketMQ 支持多种消息模型,包括发布/订阅、点对点、事务消息等,适用于各种分布式系统中的消息传递需求。
优点 | 缺点 | 适合场景 | |
---|---|---|---|
apache kafka | 吞吐量大,性能好,集群高可用 | 存在数据丢失,功能单一 | 日志分析,大数据采集 |
rabittmq | 消息可靠性高,功能全面 | 吞吐量低,生态差 | 小规模服务调用 |
apache pulsar | 消息可靠性高 | 生态不完善 | 大规模服务调用 |
apache rocketmq | 高吞吐,高并发,高性能,协议丰富 | 服务加载慢 | 几乎全场景,特别适合金融业务(电商,支付等) |
官方地址: | |||
https://rocketmq.apache.org/ |
下载地址:
https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
rocketmq的特点
- 高性能:RocketMQ 采用零拷贝技术,减少了消息传递过程中的内存拷贝,提高了消息传递的效率。
- 高可靠性:RocketMQ 支持消息持久化,确保消息不会丢失。同时,RocketMQ 还支持消息重试和死信队列,提高了消息的可靠性。
- 高扩展性:RocketMQ 支持集群部署,可以水平扩展,提高系统的处理能力。
- 多种消息模型:RocketMQ 支持发布/订阅、点对点、事务消息等多种消息模型,满足各种分布式系统中的消息传递需求。
- 丰富的协议:RocketMQ 支持多种协议,包括 TCP、HTTP、MQTT 等,可以满足各种应用场景的需求。
- 丰富的功能:RocketMQ 支持消息过滤、消息轨迹、消息重试、死信队列等功能,提高了消息处理的灵活性和可靠性。
rocketmq的架构
RocketMQ 的架构主要包括以下几个部分:
- NameServer:NameServer 是 RocketMQ 的注册中心,负责管理 Broker 的元数据信息,包括 Broker 的地址、状态等。NameServer 提供了 Broker 的注册和发现功能,客户端可以通过 NameServer 获取 Broker 的地址信息。
- Broker:Broker 是 RocketMQ 的消息存储和转发节点,负责接收和存储消息,并将消息转发给消费者。Broker 支持消息持久化,确保消息不会丢失。同时,Broker 还支持消息重试和死信队列,提高了消息的可靠性。
- Producer:Producer 是消息的生产者,负责发送消息到 Broker。Producer 可以通过 NameServer 获取 Broker 的地址信息,并将消息发送到 Broker。
- Consumer:Consumer 是消息的消费者,负责从 Broker 拉取或订阅消息进行处理。Consumer 可以通过 NameServer 获取 Broker 的地址信息,并从 Broker 拉取或订阅消息。
- Topic:Topic 是消息的分类,用于将消息进行分组。Producer 和 Consumer 可以通过 Topic 来指定要发送或接收的消息。
- Message:Message 是 RocketMQ 的消息载体,包含了消息的元数据(如消息 ID、消息类型、消息内容等)和消息体。
- Offset:Offset 是消息的偏移量,用于标识消息在 Broker 中的位置。Consumer 可以通过 Offset 来指定要拉取或订阅的消息位置。
- Queue:Queue 是消息的队列,用于将消息进行分片存储。Broker 可以将消息存储在多个 Queue 中,以提高消息的并发处理能力。
- Transaction:Transaction 是 RocketMQ 的分布式事务支持,用于保证消息的可靠性和一致性。Producer 可以通过 Transaction 发送事务消息,并在事务提交或回滚时更新消息的状态。
- Filter:Filter 是 RocketMQ 的消息过滤功能,用于根据消息的属性或内容进行消息过滤。Producer 和 Consumer 可以通过 Filter 来指定要发送或接收的消息。
- Trace:Trace 是 RocketMQ 的消息追踪功能,用于记录消息的发送、接收和处理过程。Producer 和 Consumer 可以通过 Trace 来查看消息的轨迹,以便进行问题排查和性能优化。
- Cluster:Cluster 是 RocketMQ 的集群模式,用于将多个 Broker 组成一个集群,以提高系统的处理能力和可靠性。集群中的 Broker 可以通过选举机制选择 Master 和 Slave,实现数据的备份和恢复。
- HA:HA 是 RocketMQ 的高可用性支持,用于保证 Broker 的可靠性和可用性。HA 通过主从复制和故障转移机制,确保 Broker 在发生故障时能够自动切换,保证系统的持续运行。
- Security:Security 是 RocketMQ 的安全支持,用于保护消息的传输和存储安全。RocketMQ 支持多种安全机制,包括 SSL/TLS 加密、身份验证等,确保消息的机密性和完整性。
- Monitoring:Monitoring 是 RocketMQ 的监控功能,用于实时监控系统的运行状态和性能指标。RocketMQ 提供了丰富的监控指标,包括消息吞吐量、延迟、队列长度等,帮助用户了解系统的运行状态并进行性能优化。
- Tools:Tools 是 RocketMQ 的管理工具,用于管理和监控 RocketMQ 集群。RocketMQ 提供了丰富的管理工具,包括命令行工具、Web 管理界面等,帮助用户进行集群管理和性能优化。
- Integration:Integration 是 RocketMQ 的集成功能,用于与其他系统集成。RocketMQ 支持与多种系统集成,包括大数据平台、流处理平台、微服务等,以满足各种应用场景的需求。
- Extensibility:Extensibility 是 RocketMQ 的可扩展性支持,用于支持自定义扩展。RocketMQ 提供了丰富的扩展接口,包括消息存储、消息转发、消息过滤等,帮助用户进行自定义扩展和优化。
- Scalability:Scalability 是 RocketMQ 的可扩展性支持,用于支持水平扩展。RocketMQ 支持集群部署,可以水平扩展,提高系统的处理能力。
- High Availability:High Availability 是 RocketMQ 的高可用性支持,用于保证系统的可靠性和可用性。RocketMQ 支持集群部署,可以水平扩展,提高系统的处理能力。
- High Performance:High Performance 是 RocketMQ 的高性能支持,用于提高系统的处理能力。RocketMQ 采用零拷贝技术,减少了消息传递过程中的内存拷贝,提高了消息传递的效率。
- Rich Protocols:Rich Protocols 是 RocketMQ 的协议支持,用于支持多种协议。RocketMQ 支持多种协议,包括 TCP、HTTP、MQTT 等,可以满足各种应用场景的需求。
- Rich Features:Rich Features 是 RocketMQ 的功能支持,用于支持丰富的消息处理功能。RocketMQ 支持消息过滤、消息轨迹、消息重试、死信队列等功能,提高了消息处理的灵活性和可靠性。
rocketmq的安装
下载RocketMQ安装包
https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
解压安装包
unzip rocketmq-all-4.9.4-bin-release.zip
启动NameServer
nohup sh bin/mqnamesrv &
启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
验证启动是否成功
ps -ef|grep java
501 5434 5394 0 10:46PM ttys001 0:02.85 /Library/Java/JavaVirtualMachines/jdk-23.jdk/Contents/Home/bin/java -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -Xlog:gc*:file=/Volumes/RAMDisk/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M -XX:-OmitStackTraceInFastThrow -XX:-UseLargePages -cp .:/Users/hhl/Downloads/rocketmq-all-4.9.4-bin-release/bin/../conf:/Users/hhl/Downloads/rocketmq-all-4.9.4-bin-release/bin/../lib/*: org.apache.rocketmq.namesrv.NamesrvStartup501 5994 4718 0 11:06PM ttys001 0:00.01 grep java501 5930 5920 0 11:04PM ttys002 0:06.76 /Library/Java/JavaVirtualMachines/jdk-23.jdk/Contents/Home/bin/java -server -Xms8g -Xmx8g -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -Xlog:gc*:file=/Volumes/RAMDisk/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M -XX:-OmitStackTraceInFastThrow -XX:+AlwaysPreTouch -XX:MaxDirectMemorySize=15g -XX:-UseLargePages -cp .:/Users/hhl/Downloads/rocketmq-all-4.9.4-bin-release/bin/../conf:/Users/hhl/Downloads/rocketmq-all-4.9.4-bin-release/bin/../lib/*: org.apache.rocketmq.broker.BrokerStartup -n localhost:9876
测试RocketMQ
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
停止RocketMQ
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
spirngboot整合
参考项目:https://gitee.com/naseng/springboot-rocketmq.git