您的位置:首页 > 房产 > 家装 > 设计素材网站哪个最好用_中小型网站建设与网络搭建_cdq百度指数_网络营销方案案例

设计素材网站哪个最好用_中小型网站建设与网络搭建_cdq百度指数_网络营销方案案例

2025/3/18 23:15:18 来源:https://blog.csdn.net/m0_73700505/article/details/144201444  浏览:    关键词:设计素材网站哪个最好用_中小型网站建设与网络搭建_cdq百度指数_网络营销方案案例
设计素材网站哪个最好用_中小型网站建设与网络搭建_cdq百度指数_网络营销方案案例

一.介绍mq

1.mq解决问题

1)异步调用

服务A可以给mq发送消息,然后服务B监听mq,进行消费消息,这样做不需要服务B实时的去处理服务A的请求,可以慢慢消费服务A的请求,异步调用使得服务B不需要立刻处理大量的请求,给了服务B缓冲。

2)削峰填谷

在某一时刻发生了大量请求的时候,可以使用mq把请求存起来,避免服务直接面临峰值流量,把消息存起来,慢慢消费,这样,就把峰值的流量削下来了。

3)服务解耦

可以使用mq来存储消息,并把消息发给需要的服务,将联系密切的服务进行解耦。

2.mq优势

1)使用AMQP协议

2)使用erlang语言(并发程度高,延迟低)

3)支持主流的开发语言。

二。mq架构图

三.引入依赖

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
</dependencies>

四.mq种类

1)Work Queues

工作队列模式下,会把消息发送给默认交换机,之后发送给队列,队列默认会对消费者进行轮询发放。(不考虑消费速度)

package consumer;import com.rabbitmq.client.*;
import org.junit.Test;
import util.RabbitMQConnectionUtil;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class Consumer {public static String QUEUE = "HELLO";@Testpublic void consumer1() throws Exception {Connection connection= RabbitMQConnectionUtil.getConnection();//2.获取信道channelChannel channel = connection.createChannel();//3.声明队列channel.queueDeclare(QUEUE,false,false,false,null);//3.5设置流量管控channel.basicQos(3);//4.监听消息DefaultConsumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {System.out.println("消费者1好收到消息"+new String(body,"utf-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE,false,callback);System.in.read();}@Testpublic void consumer2() throws Exception {Connection connection= RabbitMQConnectionUtil.getConnection();//2.获取信道channelChannel channel = connection.createChannel();//3.声明队列channel.queueDeclare(QUEUE,false,false,false,null);//3.5设置流量管控channel.basicQos(3);//4.监听消息DefaultConsumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2好收到消息"+new String(body,"utf-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE,false,callback);System.in.read();}
}

通过将ack设置为false,使用channel.basicQos来设置一次拿的消息数量,实现按照流量处理速度来获取相应数量的消息。

2)Publish/Subscribe

FANOUT模式下不需要设置routingkey,另外,将消息发送到交换机绑定的所有队列中

package com.gyx.pubsub;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import util.RabbitMQConnectionUtil;import java.nio.charset.StandardCharsets;public class Publisher {public String Exchange = "Exchange_Publishpubsb";public String Queue1_NAME = "QUEUE1";public String Queue2_NAME = "QUEUE2";//1.建立连接@Testpublic void pubsbPublisher() throws Exception {//1.建立连接Connection connection = RabbitMQConnectionUtil.getConnection();//2.建立信道Channel channel = connection.createChannel();//3.构建交换机channel.exchangeDeclare(Exchange, BuiltinExchangeType.FANOUT);//4.构建队列channel.queueDeclare(Queue1_NAME,false,false,false,null);channel.queueDeclare(Queue2_NAME,false,false,false,null);//5.绑定交换机,队列channel.queueBind(Queue1_NAME,Exchange,"");channel.queueBind(Queue2_NAME,Exchange,"");//6.发送消息channel.basicPublish(Exchange,"",null,"nihao".getBytes(StandardCharsets.UTF_8));System.in.read();}
}

3)routing

package com.gyx.routing;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import util.RabbitMQConnectionUtil;import java.nio.charset.StandardCharsets;public class Publisher {public String EXCHANGE_NAME = "EXCHANGE_ROUTING";public String QUEUE1 = "Rouitng_Queue1";public String QUEUE2 = "Routing_Queue2";@Testpublic void Publish() throws Exception {//1.建立连接Connection connection= RabbitMQConnectionUtil.getConnection();//2.建立信道Channel channel = connection.createChannel();//3.声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//4.声明队列channel.queueDeclare(QUEUE1,false,false,false,null);channel.queueDeclare(QUEUE2,false,false,false,null);//5.绑定交换机channel.queueBind(QUEUE1,EXCHANGE_NAME,"orange");channel.queueBind(QUEUE2,EXCHANGE_NAME,"black");channel.queueBind(QUEUE2,EXCHANGE_NAME,"green");//6.发送消息channel.basicPublish(EXCHANGE_NAME,"orange",null,"orange111".getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,"black",null,"black111".getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,"white",null,"white111".getBytes(StandardCharsets.UTF_8));System.in.read();}
}

4)topic

package com.gyx.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import util.RabbitMQConnectionUtil;import java.nio.charset.StandardCharsets;public class Publisher {public String EXCHANGE_NAME = "EXCHANGE_TOPIC";public String QUEUE1 = "Topic_Queue1";public String QUEUE2 = "Topic_Queue2";@Testpublic void Publish() throws Exception {//1.建立连接Connection connection= RabbitMQConnectionUtil.getConnection();//2.建立信道Channel channel = connection.createChannel();//3.声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//4.声明队列channel.queueDeclare(QUEUE1,false,false,false,null);channel.queueDeclare(QUEUE2,false,false,false,null);//5.绑定交换机channel.queueBind(QUEUE1,EXCHANGE_NAME,"*.orange.*");channel.queueBind(QUEUE2,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(QUEUE2,EXCHANGE_NAME,"lazy.#");//6.发送消息channel.basicPublish(EXCHANGE_NAME,"bid.orange.rabbit",null,"大橙色兔".getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,"lazy.sadadad",null,"懒惰".getBytes(StandardCharsets.UTF_8));System.in.read();}
}

5)rpc

代码如下:

package com.gyx.rpc;import com.rabbitmq.client.*;
import org.junit.Test;
import util.RabbitMQConnectionUtil;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;public class Publisher {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";//客户端@Testpublic void Publish() throws Exception {Connection connection= RabbitMQConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);String uuid = UUID.randomUUID().toString();AMQP.BasicProperties properties= new AMQP.BasicProperties().builder().replyTo(QUEUE_CONSUMER).correlationId(uuid).build();channel.basicPublish("",QUEUE_PUBLISHER,properties,"hello RPC".getBytes(StandardCharsets.UTF_8));channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String uid= properties.getCorrelationId();if(uid != null && uid.equals(uuid)){System.out.println("客户端接收相应"+new String(body,"UTF-8"));}channel.basicAck(envelope.getDeliveryTag(),false);}});System.in.read();}@Testpublic void Consumer() throws Exception {Connection connection= RabbitMQConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);String uuid = UUID.randomUUID().toString();channel.basicConsume(QUEUE_PUBLISHER,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String uid= properties.getCorrelationId();String replyTo = properties.getReplyTo();AMQP.BasicProperties pro =new AMQP.BasicProperties().builder().correlationId(uid).build();channel.basicPublish("",replyTo,pro,"服务端处理消息".getBytes(StandardCharsets.UTF_8));channel.basicAck(envelope.getDeliveryTag(),false);}});System.in.read();}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com