一:基本介绍
本文通过demo构建测试代码,debug分析的方法查看RabbitMq源码。
rabbit的中文文档: 官方中文文档
二:测试Demo
2.1 引入Springboot整合的RabbitMq依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 手写获取RabbitMq的连接,通道等信息
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** RabbitMq Source Test* @author c* date: 2024-9-26 19:12:27*/
public class RabbitMqSourceTest {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();// 设置rabbitmq的服务器地址connectionFactory.setHost("127.0.0.1");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setPort(AMQP.PROTOCOL.PORT);// 建立连接Connection conn = connectionFactory.newConnection();String exchange = "test-Exchange";String queueName = "test-Queue";String key = "test-Exchange-key";String msg = "测试消息";// 创建一个channelChannel channel = conn.createChannel();// 创建一个直连交换机channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);// 创建一个队列channel.queueDeclare(queueName, true, false, false, null);// 绑定队列channel.queueBind(queueName, exchange, key);// 发送消息channel.basicPublish(exchange, key, null, msg.getBytes());channel.close();conn.close(); }}
上面的基本流程
简单理解为:通过连接,获取通道,数据传输。
基本步骤:
- 获取连接(Connection)
- 获取通道(channel)
- 创建交换机(Exchange)
- 创建队列(Queue)
- 队列通过key绑定交换机(Bind)
- 往交换机中的key发送消息
- 其他方法
(上面创建消息的也是可以直接创建队列,进行消息的发送,源码中会将没有创建exchange设置成默认的,具体可以自己查看一下)
三:详细分析步骤
3.0 获取连接(Connection)
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)throws IOException, TimeoutException {if(this.metricsCollector == null) {this.metricsCollector = new NoOpMetricsCollector();}// make sure we respect the provided thread factoryFrameHandlerFactory fhFactory = createFrameHandlerFactory();ConnectionParams params = params(executor);// set client-provided via a client propertyif (clientProvidedName != null) {Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());properties.put("connection_name", clientProvidedName);params.setClientProperties(properties);}// 如果设置 自动发送为 trueif (isAutomaticRecoveryEnabled()) {// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection// 创建连接
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);// 初始化conn.init();return conn;} else {// 通过 addrs 获取List<Address> addrs = addressResolver.getAddresses();Exception lastException = null;for (Address addr : addrs) {try {// 创建对应的 FrameHandler FrameHandler handler = fhFactory.create(addr, clientProvidedName);// 创建连接 AMQConnection conn = createConnection(params, handler, metricsCollector); conn.start();this.metricsCollector.newConnection(conn);return conn;} catch (IOException e) {lastException = e;} catch (TimeoutException te) {lastException = te;}}if (lastException != null) {if (lastException instanceof IOException) {throw (IOException) lastException;} else if (lastException instanceof TimeoutException) {throw (TimeoutException) lastException;}}throw new IOException("failed to connect");}}
3.1 创建渠道(Channel)
@Overridepublic Channel createChannel() throws IOException {// 确定开启状态ensureIsOpen();ChannelManager cm = _channelManager;if (cm == null) return null;// 重点可以看下这里: 创建channelChannel channel = cm.createChannel(this);// 通过 metricsCollector 创建新的channelmetricsCollector.newChannel(channel);return channel;}
createChannel 方法:
这里主要是通过channelNumberAllocator分配到一个channelNumber,可以理解为一个唯一标识,具体可以自行看一下它的实现
public ChannelN createChannel(AMQConnection connection) throws IOException {ChannelN ch;synchronized (this.monitor) {// 通过 channelNumberAllocator 获取到一个 channelNumber int channelNumber = channelNumberAllocator.allocate();if (channelNumber == -1) {return null;} else {ch = addNewChannel(connection, channelNumber);}}ch.open(); // now that it's been safely addedreturn ch;}
3.2 创建交换机(Exchange)
public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {final AMQP.Exchange.DeclareOk ok = delegate.exchangeDeclare(exchange, type, durable, autoDelete, internal, arguments);RecordedExchange x = new RecordedExchange(this, exchange).type(type).durable(durable).autoDelete(autoDelete).arguments(arguments);// 记录当前的交换机recordExchange(exchange, x);return ok;}
进入delegate.exchangeDeclare方法,可以看到控制台会创建成功exchange:
3.3 创建队列(Queue)
@Overridepublic AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {// 这里执行完成会创建成功队列final AMQP.Queue.DeclareOk ok = delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments);RecordedQueue q = new RecordedQueue(this, ok.getQueue()).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments);if (queue.equals(RecordedQueue.EMPTY_STRING)) {q.serverNamed(true);}recordQueue(ok, q);return ok;}
@Overridepublic Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,boolean autoDelete, Map<String, Object> arguments)throws IOException{validateQueueNameLength(queue);return (Queue.DeclareOk)// 通过rpc申明 信息exnWrappingRpc(new Queue.Declare.Builder().queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();}
可以看到此时虽然创建了queue,但是并未绑定到exchang上面,需要进行下面的绑定
3.4 绑定队列
@Overridepublic AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException {// 绑定队列AMQP.Queue.BindOk ok = delegate.queueBind(queue, exchange, routingKey, arguments);recordQueueBinding(queue, exchange, routingKey, arguments);return ok;}
@Overridepublic Queue.BindOk queueBind(String queue, String exchange,String routingKey, Map<String, Object> arguments)throws IOException{validateQueueNameLength(queue);return (Queue.BindOk) // 通过 rpc 申明绑定信息exnWrappingRpc(new Queue.Bind.Builder().queue(queue).exchange(exchange).routingKey(routingKey).arguments(arguments).build()).getMethod();}
3.5 发送消息
@Overridepublic void basicPublish(String exchange, String routingKey,boolean mandatory, boolean immediate,BasicProperties props, byte[] body)throws IOException{if (nextPublishSeqNo > 0) {unconfirmedSet.add(getNextPublishSeqNo());nextPublishSeqNo++;}if (props == null) {props = MessageProperties.MINIMAL_BASIC;}// 组装消息AMQCommand command = new AMQCommand(new Basic.Publish.Builder().exchange(exchange).routingKey(routingKey).mandatory(mandatory).immediate(immediate).build(), props, body);try {// 发送消息transmit(command);} catch (IOException e) {metricsCollector.basicPublishFailure(this, e);throw e;}// 推送当前的channel 进行发布metricsCollector.basicPublish(this);}
四:总结
发送消息可以理解为以下步骤:
- 通过Channel往Rabbit服务端发送消息
- 通过PRC申明交换机,队列,绑定等信息
- 通过AMQP协议发送消息
👍如果对你有帮助,给博主一个免费的点赞以示鼓励
欢迎各位🔎点赞👍评论收藏⭐️