安装
前置条件
- activemq的运行依赖于jdk,需要
提前安装jdk
- 如果已经安装了jdk,需要根据jdk的版本来选择对应的版本进行安装activemq
- 版本对应在官网上,使用
java -version
看jdk的版本- 注意:jdk和mq的版本不一致会报错,电脑的命名
不能用中文
启动
- 有的版本bin目录会有2个目录win32和win64,有的只有win64
- 打开bin目录
- 点击activemq.bat,访问网址 http://127.0.0.1:8161/admin/
使用服务进行启动
- 上述方式使用bat文件启动,只要cmd窗口一关,就会关闭
- 自带安装服务和卸载服务的bat文件(使用
管理员
权限运行)
关闭或开启验证
- 更改完配置之后记得
重启
一下服务
消息中间件的应用场景
- 异步处理
- 应用解耦
- 流量削峰
场景
用户注册需要三个功能:写数据库、发送邮件、注册短信
异步处理
串行
并行
消息中间件
对比
应用解耦
流量削峰
中间件对比
- activemq:java、万级吞吐量、毫秒级响应速度、可用性高(主从架构),很多公司在用
- rabbitmq:erlang,其他同上,管理界面丰富
- rocketmq:java,10万级别,可用性很高(分布式)
- kafka:消息查询和追溯没有提供,大数据应用广泛
jms消息模型
- 两种模型
- P2P(point to point):点对点模型(queue模型)
- P/S(publish/subscribe):发布订阅模型(topic模型)
点对点
生产者
和消费者
之间的消息往来
特点
- 每个消息只能被
一个
消费者消费,消费之后就直接消失了- 生产者和消费者没有直接
依赖
关系,不管消费者有没有运行,都不妨碍生产者将消息发给队列- 消费者成功接受消息之后需要向队列回应
应答
成功
发布订阅模型
三个
角色
- 发布者
- 订阅者
- 主题
- 发布者将消息发布到
主题
,只有订阅了主题的订阅者才能接收到消息topic
来实现消息的发布- 当一个消息被发布,
n
个订阅者都可以得到消息的拷贝
特点
- 一个主题可以被
多个
订阅者订阅- 存在时间的
先后顺序
,消费者需要先订阅
,生产者才能发布消息- 订阅者必须保持
持续运行
状态,才能接收到生产者的消息
JMS API
访问地址
- http协议:http://127.0.0.1:8161/(网页
监控
)- tcp协议:tcp://127.0.0.1:61616(java
后端
访问)
JMS原始方式
点对点queue
引入依赖
log4j
的依赖也需要引入一下
<dependencies><!-- 引入原生的依赖 5.18版本不受activemq版本的影响 --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>6.1.0</version></dependency><!-- 引入log4j的依赖,不引入会报错 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.17.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.1</version></dependency></dependencies>
生产者
- 后端使用的端口是
tcp://localhost:61616
import jakarta.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;// 队列生产者 点对点
public class Producer {public static void main(String[] args) throws JMSException {// 1. 创建连接工厂 默认使用tcp协议ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 2. 创建连接Connection connection = activeMQConnectionFactory.createConnection();// 3. 启动连接connection.start();// 4. 创建会话// 第一个参数:是否开启事务// 第二个参数:消息确认机制Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 创建队列Queue queue = session.createQueue("queue01");// 6. 创建消息生产者MessageProducer producer = session.createProducer(queue);// 7. 创建消息 createTextMessage创建文本消息TextMessage textMessage = session.createTextMessage("Hello, ActiveMQ!");// 8. 发送消息producer.send(textMessage);// 9. 关闭资源 从后往前关闭session.close();connection.close();}
}
下图表示发送消息成功
消费者
一般方式(不常用)
- 步骤和生产者如出一辙,就是最后变成了
receive
方法- 使用
while true
,使得消费者一直处在等待
中
// 点对点消费者
public class Consumer {public static void main(String[] args) throws JMSException {//1. 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");//2. 创建连接Connection connection = connectionFactory.createConnection();//3. 启动连接connection.start();//4. 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5. 指定队列Queue queue = session.createQueue("queue01");//6. 创建消息消费者MessageConsumer consumer = session.createConsumer(queue);//7. 接受并且消费消息//receive()是阻塞方法,如果没有消息会一直等待// 一般不会关闭消费者,因为一旦关闭就不能接收消息了while (true) {Message textMessage = consumer.receive();if (textMessage == null) {break;}//判断消息类型if (textMessage instanceof TextMessage) {TextMessage message = (TextMessage) textMessage;System.out.println("接收到消息:" + message.getText());}}}
}
监听器(推荐使用)
使用监听器,更加优雅
public class Listener {public static void main(String[] args) throws JMSException {//1. 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");//2. 创建连接Connection connection = connectionFactory.createConnection();//3. 启动连接connection.start();//4. 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5. 指定队列Queue queue = session.createQueue("queue01");//6. 创建消息消费者MessageConsumer consumer = session.createConsumer(queue);//7. 设置监听器 匿名内部类consumer.setMessageListener(new MessageListener() {// 重写onMessage方法 接收消息@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;try {System.out.println("接收到消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});}
}