一.介绍mq
1.mq解决问题
1)异步调用
服务A可以给mq发送消息,然后服务B监听mq,进行消费消息,这样做不需要服务B实时的去处理服务A的请求,可以慢慢消费服务A的请求,异步调用使得服务B不需要立刻处理大量的请求,给了服务B缓冲。
2)削峰填谷
在某一时刻发生了大量请求的时候,可以使用mq把请求存起来,避免服务直接面临峰值流量,把消息存起来,慢慢消费,这样,就把峰值的流量削下来了。
3)服务解耦
可以使用mq来存储消息,并把消息发给需要的服务,将联系密切的服务进行解耦。
2.mq优势
1)使用AMQP协议
2)使用erlang语言(并发程度高,延迟低)
3)支持主流的开发语言。
二。mq架构图
三.引入依赖
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
</dependencies>
四.mq种类
1)Work Queues
工作队列模式下,会把消息发送给默认交换机,之后发送给队列,队列默认会对消费者进行轮询发放。(不考虑消费速度)
package consumer;import com.rabbitmq.client.*;
import org.junit.Test;
import util.RabbitMQConnectionUtil;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class Consumer {public static String QUEUE = "HELLO";@Testpublic void consumer1() throws Exception {Connection connection= RabbitMQConnectionUtil.getConnection();//2.获取信道channelChannel channel = connection.createChannel();//3.声明队列channel.queueDeclare(QUEUE,false,false,false,null);//3.5设置流量管控channel.basicQos(3);//4.监听消息DefaultConsumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {System.out.println("消费者1好收到消息"+new String(body,"utf-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE,false,callback);System.in.read();}@Testpublic void consumer2() throws Exception {Connection connection= RabbitMQConnectionUtil.getConnection();//2.获取信道channelChannel channel = connection.createChannel();//3.声明队列channel.queueDeclare(QUEUE,false,false,false,null);//3.5设置流量管控channel.basicQos(3);//4.监听消息DefaultConsumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2好收到消息"+new String(body,"utf-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE,false,callback);System.in.read();}
}
通过将ack设置为false,使用channel.basicQos来设置一次拿的消息数量,实现按照流量处理速度来获取相应数量的消息。
2)Publish/Subscribe
FANOUT模式下不需要设置routingkey,另外,将消息发送到交换机绑定的所有队列中
package com.gyx.pubsub;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import util.RabbitMQConnectionUtil;import java.nio.charset.StandardCharsets;public class Publisher {public String Exchange = "Exchange_Publishpubsb";public String Queue1_NAME = "QUEUE1";public String Queue2_NAME = "QUEUE2";//1.建立连接@Testpublic void pubsbPublisher() throws Exception {//1.建立连接Connection connection = RabbitMQConnectionUtil.getConnection();//2.建立信道Channel channel = connection.createChannel();//3.构建交换机channel.exchangeDeclare(Exchange, BuiltinExchangeType.FANOUT);//4.构建队列channel.queueDeclare(Queue1_NAME,false,false,false,null);channel.queueDeclare(Queue2_NAME,false,false,false,null);//5.绑定交换机,队列channel.queueBind(Queue1_NAME,Exchange,"");channel.queueBind(Queue2_NAME,Exchange,"");//6.发送消息channel.basicPublish(Exchange,"",null,"nihao".getBytes(StandardCharsets.UTF_8));System.in.read();}
}
3)routing
package com.gyx.routing;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import util.RabbitMQConnectionUtil;import java.nio.charset.StandardCharsets;public class Publisher {public String EXCHANGE_NAME = "EXCHANGE_ROUTING";public String QUEUE1 = "Rouitng_Queue1";public String QUEUE2 = "Routing_Queue2";@Testpublic void Publish() throws Exception {//1.建立连接Connection connection= RabbitMQConnectionUtil.getConnection();//2.建立信道Channel channel = connection.createChannel();//3.声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//4.声明队列channel.queueDeclare(QUEUE1,false,false,false,null);channel.queueDeclare(QUEUE2,false,false,false,null);//5.绑定交换机channel.queueBind(QUEUE1,EXCHANGE_NAME,"orange");channel.queueBind(QUEUE2,EXCHANGE_NAME,"black");channel.queueBind(QUEUE2,EXCHANGE_NAME,"green");//6.发送消息channel.basicPublish(EXCHANGE_NAME,"orange",null,"orange111".getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,"black",null,"black111".getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,"white",null,"white111".getBytes(StandardCharsets.UTF_8));System.in.read();}
}
4)topic
package com.gyx.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import util.RabbitMQConnectionUtil;import java.nio.charset.StandardCharsets;public class Publisher {public String EXCHANGE_NAME = "EXCHANGE_TOPIC";public String QUEUE1 = "Topic_Queue1";public String QUEUE2 = "Topic_Queue2";@Testpublic void Publish() throws Exception {//1.建立连接Connection connection= RabbitMQConnectionUtil.getConnection();//2.建立信道Channel channel = connection.createChannel();//3.声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//4.声明队列channel.queueDeclare(QUEUE1,false,false,false,null);channel.queueDeclare(QUEUE2,false,false,false,null);//5.绑定交换机channel.queueBind(QUEUE1,EXCHANGE_NAME,"*.orange.*");channel.queueBind(QUEUE2,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(QUEUE2,EXCHANGE_NAME,"lazy.#");//6.发送消息channel.basicPublish(EXCHANGE_NAME,"bid.orange.rabbit",null,"大橙色兔".getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,"lazy.sadadad",null,"懒惰".getBytes(StandardCharsets.UTF_8));System.in.read();}
}
5)rpc
代码如下:
package com.gyx.rpc;import com.rabbitmq.client.*;
import org.junit.Test;
import util.RabbitMQConnectionUtil;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;public class Publisher {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";//客户端@Testpublic void Publish() throws Exception {Connection connection= RabbitMQConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);String uuid = UUID.randomUUID().toString();AMQP.BasicProperties properties= new AMQP.BasicProperties().builder().replyTo(QUEUE_CONSUMER).correlationId(uuid).build();channel.basicPublish("",QUEUE_PUBLISHER,properties,"hello RPC".getBytes(StandardCharsets.UTF_8));channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String uid= properties.getCorrelationId();if(uid != null && uid.equals(uuid)){System.out.println("客户端接收相应"+new String(body,"UTF-8"));}channel.basicAck(envelope.getDeliveryTag(),false);}});System.in.read();}@Testpublic void Consumer() throws Exception {Connection connection= RabbitMQConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);String uuid = UUID.randomUUID().toString();channel.basicConsume(QUEUE_PUBLISHER,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String uid= properties.getCorrelationId();String replyTo = properties.getReplyTo();AMQP.BasicProperties pro =new AMQP.BasicProperties().builder().correlationId(uid).build();channel.basicPublish("",replyTo,pro,"服务端处理消息".getBytes(StandardCharsets.UTF_8));channel.basicAck(envelope.getDeliveryTag(),false);}});System.in.read();}
}