您的位置:首页 > 财经 > 金融 > 邢台网站改版制作公司_公司起名字大全免费两个字_关键词林俊杰的寓意_百度网页排名怎么提升

邢台网站改版制作公司_公司起名字大全免费两个字_关键词林俊杰的寓意_百度网页排名怎么提升

2024/12/22 13:16:17 来源:https://blog.csdn.net/m0_73992740/article/details/144287973  浏览:    关键词:邢台网站改版制作公司_公司起名字大全免费两个字_关键词林俊杰的寓意_百度网页排名怎么提升
邢台网站改版制作公司_公司起名字大全免费两个字_关键词林俊杰的寓意_百度网页排名怎么提升

文章目录

  • 六. RPC(RPC通信模式)
    • 客户端
    • 服务端
  • 七. Publisher Confirms(发布确认模式)
    • 1. Publishing Messages Individually(单独确认)
    • 2. Publishing Messages in Batches(批量确认)
    • 3. Handling Publisher Confirms Asynchronously(异步确认)

六. RPC(RPC通信模式)

在这里插入图片描述
在这里插入图片描述

  1. 客⼾端发送消息到⼀个指定的队列, 并在消息属性中设置replyTo字段, 这个字段指定了⼀个回调队列, ⽤于接收服务端的响应.
  2. 服务端接收到请求后, 处理请求并发送响应消息到replyTo指定的回调队列
  3. 客⼾端在回调队列上等待响应消息. ⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应

公共代码:

 	public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";

客户端

客户端需要完成两件事, 发送请求, 接收响应
发送请求:

//发送请求://声明队列channel.queueDeclare(Common.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Common.RPC_RESPONSE_QUEUE, true, false, false, null);//定义回调队列String replyQueueName = Common.RPC_RESPONSE_QUEUE;//本次请求的唯一标识String corrId = UUID.randomUUID().toString();//生成发送消息的属性AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();//通过内置交换机, 发送消息String message = "hello, rpc......";channel.basicPublish("", Common.RPC_REQUEST_QUEUE, props, message.getBytes(StandardCharsets.UTF_8));

接收响应:

 //接收响应://使用阻塞队列来存储回调结果, 避免了客户端反复访问队列final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);//接收服务器的响应DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));//如果唯一标识正确, 放在阻塞队列中if(properties.getCorrelationId().equals(corrId)){response.offer(new String(body));}}};channel.basicConsume(replyQueueName, true, consumer);//获取回调的结果String result = response.take();System.out.println("[RPCClient] Result: " + result);

服务端

在这里插入图片描述

		//设置同时最多只能获取一个消息channel.basicQos(1);//接收消息, 并对消息进行应答DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String message = new String(body);String response = "接收到消息 request: " + message;channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));//对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Common.RPC_REQUEST_QUEUE, false, consumer);//自动应答设置成false, 在成功回调后, 再进行手动应答

在这里插入图片描述

七. Publisher Confirms(发布确认模式)

在这里插入图片描述

Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.

  1. ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后, 发布的每⼀条消息都会获得⼀个唯⼀的ID, ⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.
  2. 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者(包含消息的唯⼀ID),表明消息已经送达.
    通过Publisher Confirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收, 从⽽避免消息丢失的问题.
    适⽤场景: 对数据安全性要求较⾼的场景. ⽐如⾦融交易, 订单处理

⽣产者将信道设置成confirm(确认)模式, ⼀旦信道进⼊confirm模式, 所有在该信道上⾯发布的消息都会被指派⼀个唯⼀的ID(从1开始), ⼀旦消息被投递到所有匹配的队列之后, RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID), 这就使得⽣产者知道消息已经正确到达⽬的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写⼊磁盘之后发出. broker回传给⽣产者的确认消息中deliveryTag 包含了确认消息的序号, 此外 broker 也可以设置channel.basicAck⽅法中的multiple参数, 表⽰到这个序号之前的所有消息都已经得到了处理.
在这里插入图片描述

使⽤发送确认机制, 必须要信道设置成confirm(确认)模式

发布确认有3种策略:

1. Publishing Messages Individually(单独确认)

public static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {try(Connection connection = createConnection()){//创建channelChannel channel = connection.createChannel();//开启信道确认模式channel.confirmSelect();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);Long start = System.currentTimeMillis();for(int i = 0; i < MESSAGE_COUNT; i++){String body = "消息" + i;channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE1, null, body.getBytes(StandardCharsets.UTF_8));//等待确认消息, 只要消息被确认, 这个方法就会被返回//如果超时过期, 则抛出TimeoutException, 如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOExceptionchannel.waitForConfirmsOrDie();}Long end = System.currentTimeMillis(5000);System.out.printf("Published %d message individually in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述

2. Publishing Messages in Batches(批量确认)

    private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {try(Connection connection = createConnection()){Channel channel = connection.createChannel();channel.confirmSelect();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//批量个数int batchSize = 100;int outstandingMessageCount = 0;long start = System.currentTimeMillis();for(int i = 0; i < MESSAGE_COUNT; i++){String body = "消息" + i;channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE2, null, body.getBytes(StandardCharsets.UTF_8));outstandingMessageCount++;if(outstandingMessageCount == batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if(outstandingMessageCount > 0){channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("Published %d message batch in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述
相⽐于单独确认策略, 批量确认极⼤地提升了confirm的效率, 缺点是出现Basic.Nack或者超时时, 我们不清楚具体哪条消息出了问题. 客⼾端需要将这⼀批次的消息全部重发, 这会带来明显的重复消息数量, 当消息经常丢失时,批量确认的性能应该是不升反降的

3. Handling Publisher Confirms Asynchronously(异步确认)

异步confirm⽅法的编程实现最为复杂. Channel 接⼝提供了⼀个⽅法addConfirmListener. 这个⽅法可以添加ConfirmListener 回调接⼝.
ConfirmListener 接⼝中包含两个⽅法: handleAck(long deliveryTag, boolean multiple) 和 handleNack(long deliveryTag, boolean multiple) , 分别对应处理RabbitMQ发送给⽣产者的ack和nack.
deliveryTag 表⽰发送消息的序号. multiple 表⽰是否批量确认.
我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合. 当收到RabbitMQ的confirm 回调时, 从集合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带⼀个从1开始递增的deliveryTag序号. 我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合.

  1. 当收到ack时, 从序列中删除该消息的序号. 如果为批量确认消息, 表⽰⼩于等于当前序号deliveryTag的消息都收到了, 则清除对应集合
  2. 当收到nack时, 处理逻辑类似, 不过需要结合具体的业务情况, 进⾏消息重发等操作
private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException{try(Connection connection = createConnection()){Channel channel = connection.createChannel();channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);channel.confirmSelect();//有序集合, 元素按照自然顺序进行排序, 存储未confirm消息序号SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if(multiple){//批量confirmSet.headSet(deliveryTag + 1).clear();}else{confirmSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if(multiple){//批量confirmSet.headSet(deliveryTag + 1).clear();}else{confirmSet.remove(deliveryTag);}//如果处理失败, 需要有消息重发的环节, 此处省略}});long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE3, null, message.getBytes(StandardCharsets.UTF_8));confirmSet.add(nextPublishSeqNo);}while(!confirmSet.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("Published %d message ConfirmsAsynchronously in %d ms", MESSAGE_COUNT, end - start);}}

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com