您的位置:首页 > 健康 > 养生 > java消息队列ActiveMQ

java消息队列ActiveMQ

2024/12/23 10:55:38 来源:https://blog.csdn.net/qq_40603125/article/details/141013856  浏览:    关键词:java消息队列ActiveMQ

安装

前置条件

  • activemq的运行依赖于jdk,需要提前安装jdk
  • 如果已经安装了jdk,需要根据jdk的版本来选择对应的版本进行安装activemq
  • 版本对应在官网上,使用java -version 看jdk的版本
  • 注意:jdk和mq的版本不一致会报错,电脑的命名不能用中文

启动

  • 有的版本bin目录会有2个目录win32和win64,有的只有win64
  • 打开bin目录
  • 点击activemq.bat,访问网址 http://127.0.0.1:8161/admin/

在这里插入图片描述

使用服务进行启动

  • 上述方式使用bat文件启动,只要cmd窗口一关,就会关闭
  • 自带安装服务和卸载服务的bat文件(使用管理员权限运行)

在这里插入图片描述

关闭或开启验证

  • 更改完配置之后记得重启一下服务

在这里插入图片描述
在这里插入图片描述

消息中间件的应用场景

  • 异步处理
  • 应用解耦
  • 流量削峰

场景

用户注册需要三个功能:写数据库、发送邮件、注册短信

异步处理

串行

在这里插入图片描述

并行

在这里插入图片描述

消息中间件

在这里插入图片描述

对比

在这里插入图片描述

应用解耦

在这里插入图片描述

流量削峰

在这里插入图片描述

中间件对比

  • activemq:java、万级吞吐量、毫秒级响应速度、可用性高(主从架构),很多公司在用
  • rabbitmq:erlang,其他同上,管理界面丰富
  • rocketmq:java,10万级别,可用性很高(分布式)
  • kafka:消息查询和追溯没有提供,大数据应用广泛

jms消息模型

  • 两种模型
  • P2P(point to point):点对点模型(queue模型)
  • P/S(publish/subscribe):发布订阅模型(topic模型)

点对点

生产者消费者之间的消息往来

在这里插入图片描述

特点

  • 每个消息只能被一个消费者消费,消费之后就直接消失了
  • 生产者和消费者没有直接依赖关系,不管消费者有没有运行,都不妨碍生产者将消息发给队列
  • 消费者成功接受消息之后需要向队列回应应答成功

在这里插入图片描述

发布订阅模型

三个角色

  • 发布者
  • 订阅者
  • 主题
  • 发布者将消息发布到主题,只有订阅了主题的订阅者才能接收到消息
  • topic来实现消息的发布
  • 当一个消息被发布,n个订阅者都可以得到消息的拷贝

在这里插入图片描述

特点

  • 一个主题可以被多个订阅者订阅
  • 存在时间的先后顺序,消费者需要先订阅,生产者才能发布消息
  • 订阅者必须保持持续运行状态,才能接收到生产者的消息

JMS API

在这里插入图片描述

在这里插入图片描述

访问地址

  • http协议:http://127.0.0.1:8161/(网页监控)
  • tcp协议:tcp://127.0.0.1:61616(java后端访问)

JMS原始方式

点对点queue

引入依赖

  • log4j的依赖也需要引入一下
    <dependencies><!--   引入原生的依赖  5.18版本不受activemq版本的影响   --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>6.1.0</version></dependency><!--   引入log4j的依赖,不引入会报错     --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.17.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.1</version></dependency></dependencies>

生产者

  • 后端使用的端口是tcp://localhost:61616
import jakarta.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;// 队列生产者 点对点
public class Producer {public static void main(String[] args) throws JMSException {// 1. 创建连接工厂 默认使用tcp协议ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 2. 创建连接Connection connection = activeMQConnectionFactory.createConnection();// 3. 启动连接connection.start();// 4. 创建会话// 第一个参数:是否开启事务// 第二个参数:消息确认机制Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 创建队列Queue queue = session.createQueue("queue01");// 6. 创建消息生产者MessageProducer producer = session.createProducer(queue);// 7. 创建消息 createTextMessage创建文本消息TextMessage textMessage = session.createTextMessage("Hello, ActiveMQ!");// 8. 发送消息producer.send(textMessage);// 9. 关闭资源 从后往前关闭session.close();connection.close();}
}

下图表示发送消息成功

在这里插入图片描述

消费者

一般方式(不常用)
  • 步骤和生产者如出一辙,就是最后变成了receive方法
  • 使用while true,使得消费者一直处在等待
// 点对点消费者
public class Consumer {public static void main(String[] args) throws JMSException {//1. 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");//2. 创建连接Connection connection = connectionFactory.createConnection();//3. 启动连接connection.start();//4. 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5. 指定队列Queue queue = session.createQueue("queue01");//6. 创建消息消费者MessageConsumer consumer = session.createConsumer(queue);//7. 接受并且消费消息//receive()是阻塞方法,如果没有消息会一直等待// 一般不会关闭消费者,因为一旦关闭就不能接收消息了while (true) {Message textMessage = consumer.receive();if (textMessage == null) {break;}//判断消息类型if (textMessage instanceof TextMessage) {TextMessage message = (TextMessage) textMessage;System.out.println("接收到消息:" + message.getText());}}}
}
监听器(推荐使用)

使用监听器,更加优雅

public class Listener {public static void main(String[] args) throws JMSException {//1. 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");//2. 创建连接Connection connection = connectionFactory.createConnection();//3. 启动连接connection.start();//4. 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5. 指定队列Queue queue = session.createQueue("queue01");//6. 创建消息消费者MessageConsumer consumer = session.createConsumer(queue);//7. 设置监听器 匿名内部类consumer.setMessageListener(new MessageListener() {// 重写onMessage方法 接收消息@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;try {System.out.println("接收到消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com