您的位置:首页 > 娱乐 > 八卦 > 网页设计学习教程_网站ip域名查询_seo专员是指什么意思_百度下载免费安装到桌面

网页设计学习教程_网站ip域名查询_seo专员是指什么意思_百度下载免费安装到桌面

2024/10/5 6:02:08 来源:https://blog.csdn.net/gongzi_9/article/details/142569916  浏览:    关键词:网页设计学习教程_网站ip域名查询_seo专员是指什么意思_百度下载免费安装到桌面
网页设计学习教程_网站ip域名查询_seo专员是指什么意思_百度下载免费安装到桌面

一:基本介绍

        本文通过demo构建测试代码,debug分析的方法查看RabbitMq源码。

rabbit的中文文档: 官方中文文档

二:测试Demo

2.1 引入Springboot整合的RabbitMq依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 手写获取RabbitMq的连接,通道等信息

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** RabbitMq Source Test* @author c* date: 2024-9-26 19:12:27*/
public class RabbitMqSourceTest {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();// 设置rabbitmq的服务器地址connectionFactory.setHost("127.0.0.1");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setPort(AMQP.PROTOCOL.PORT);// 建立连接Connection conn = connectionFactory.newConnection();String exchange = "test-Exchange";String queueName = "test-Queue";String key = "test-Exchange-key";String msg = "测试消息";// 创建一个channelChannel channel = conn.createChannel();// 创建一个直连交换机channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);// 创建一个队列channel.queueDeclare(queueName, true, false, false, null);// 绑定队列channel.queueBind(queueName, exchange, key);// 发送消息channel.basicPublish(exchange, key, null, msg.getBytes());channel.close();conn.close();   }}

上面的基本流程

简单理解为:通过连接,获取通道,数据传输

基本步骤:

  1. 获取连接(Connection)
  2. 获取通道(channel)
  3. 创建交换机(Exchange)
  4. 创建队列(Queue)
  5. 队列通过key绑定交换机(Bind)
  6. 往交换机中的key发送消息
  7. 其他方法

(上面创建消息的也是可以直接创建队列,进行消息的发送,源码中会将没有创建exchange设置成默认的,具体可以自己查看一下)

三:详细分析步骤

3.0 获取连接(Connection)

 public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)throws IOException, TimeoutException {if(this.metricsCollector == null) {this.metricsCollector = new NoOpMetricsCollector();}// make sure we respect the provided thread factoryFrameHandlerFactory fhFactory = createFrameHandlerFactory();ConnectionParams params = params(executor);// set client-provided via a client propertyif (clientProvidedName != null) {Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());properties.put("connection_name", clientProvidedName);params.setClientProperties(properties);}// 如果设置 自动发送为 trueif (isAutomaticRecoveryEnabled()) {// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection// 创建连接            
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);// 初始化conn.init();return conn;} else {// 通过 addrs  获取List<Address> addrs = addressResolver.getAddresses();Exception lastException = null;for (Address addr : addrs) {try {// 创建对应的 FrameHandler FrameHandler handler = fhFactory.create(addr, clientProvidedName);// 创建连接                    AMQConnection conn = createConnection(params, handler, metricsCollector);    conn.start();this.metricsCollector.newConnection(conn);return conn;} catch (IOException e) {lastException = e;} catch (TimeoutException te) {lastException = te;}}if (lastException != null) {if (lastException instanceof IOException) {throw (IOException) lastException;} else if (lastException instanceof TimeoutException) {throw (TimeoutException) lastException;}}throw new IOException("failed to connect");}}

3.1 创建渠道(Channel)

    @Overridepublic Channel createChannel() throws IOException {// 确定开启状态ensureIsOpen();ChannelManager cm = _channelManager;if (cm == null) return null;// 重点可以看下这里: 创建channelChannel channel = cm.createChannel(this);// 通过 metricsCollector 创建新的channelmetricsCollector.newChannel(channel);return channel;}

 createChannel 方法:

        这里主要是通过channelNumberAllocator分配到一个channelNumber,可以理解为一个唯一标识,具体可以自行看一下它的实现

    public ChannelN createChannel(AMQConnection connection) throws IOException {ChannelN ch;synchronized (this.monitor) {// 通过 channelNumberAllocator 获取到一个 channelNumber int channelNumber = channelNumberAllocator.allocate();if (channelNumber == -1) {return null;} else {ch = addNewChannel(connection, channelNumber);}}ch.open(); // now that it's been safely addedreturn ch;}

3.2 创建交换机(Exchange)

    public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {final AMQP.Exchange.DeclareOk ok = delegate.exchangeDeclare(exchange, type, durable, autoDelete, internal, arguments);RecordedExchange x = new RecordedExchange(this, exchange).type(type).durable(durable).autoDelete(autoDelete).arguments(arguments);// 记录当前的交换机recordExchange(exchange, x);return ok;}

进入delegate.exchangeDeclare方法,可以看到控制台会创建成功exchange:

3.3 创建队列(Queue)

    @Overridepublic AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {// 这里执行完成会创建成功队列final AMQP.Queue.DeclareOk ok = delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments);RecordedQueue q = new RecordedQueue(this, ok.getQueue()).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments);if (queue.equals(RecordedQueue.EMPTY_STRING)) {q.serverNamed(true);}recordQueue(ok, q);return ok;}
    @Overridepublic Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,boolean autoDelete, Map<String, Object> arguments)throws IOException{validateQueueNameLength(queue);return (Queue.DeclareOk)// 通过rpc申明 信息exnWrappingRpc(new Queue.Declare.Builder().queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();}

可以看到此时虽然创建了queue,但是并未绑定到exchang上面,需要进行下面的绑定

3.4 绑定队列

    @Overridepublic AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException {// 绑定队列AMQP.Queue.BindOk ok = delegate.queueBind(queue, exchange, routingKey, arguments);recordQueueBinding(queue, exchange, routingKey, arguments);return ok;}
    @Overridepublic Queue.BindOk queueBind(String queue, String exchange,String routingKey, Map<String, Object> arguments)throws IOException{validateQueueNameLength(queue);return (Queue.BindOk)    // 通过 rpc 申明绑定信息exnWrappingRpc(new Queue.Bind.Builder().queue(queue).exchange(exchange).routingKey(routingKey).arguments(arguments).build()).getMethod();}

3.5 发送消息

    @Overridepublic void basicPublish(String exchange, String routingKey,boolean mandatory, boolean immediate,BasicProperties props, byte[] body)throws IOException{if (nextPublishSeqNo > 0) {unconfirmedSet.add(getNextPublishSeqNo());nextPublishSeqNo++;}if (props == null) {props = MessageProperties.MINIMAL_BASIC;}// 组装消息AMQCommand command = new AMQCommand(new Basic.Publish.Builder().exchange(exchange).routingKey(routingKey).mandatory(mandatory).immediate(immediate).build(), props, body);try {// 发送消息transmit(command);} catch (IOException e) {metricsCollector.basicPublishFailure(this, e);throw e;}// 推送当前的channel 进行发布metricsCollector.basicPublish(this);}

四:总结

发送消息可以理解为以下步骤:

  1. 通过Channel往Rabbit服务端发送消息
  2. 通过PRC申明交换机,队列,绑定等信息
  3. 通过AMQP协议发送消息

   👍如果对你有帮助,给博主一个免费的点赞以示鼓励
欢迎各位🔎点赞👍评论收藏⭐️

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com