您的位置:首页 > 新闻 > 热点要闻 > 汽车报价网址_制作网页可以用word吗_免费的网站关键词查询工具_aso优化注意什么

汽车报价网址_制作网页可以用word吗_免费的网站关键词查询工具_aso优化注意什么

2024/12/21 21:47:31 来源:https://blog.csdn.net/laozengsky/article/details/142358191  浏览:    关键词:汽车报价网址_制作网页可以用word吗_免费的网站关键词查询工具_aso优化注意什么
汽车报价网址_制作网页可以用word吗_免费的网站关键词查询工具_aso优化注意什么

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)

gRpc协议类定义

message AdMsgProto{
optional string msg=1;
optional string tag=2;
optional string topic=3;
}
2. service方法定义
service MQDataService{
rpc sendRedissonMsg(AdMsgProto)returns (Code);
rpc receiveRedissonMsg(String)returns (stream AdMsgProto);
}

服务端写法

package com.mykkhw.mykkhw_data_mq.service.grpc;import com.mykkhw.mykkhw_data_protocols.Base.ReqDataProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultType;
import com.mykkhw.mykkhw_data_protocols.MQ.TiktokMsgProto;
import com.mykkhw.mykkhw_data_protocols.MQService.MQDataServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.redisson.api.RBlockingQueue;
import org.springframework.util.StringUtils;public class MqRpcService extends MQDataServiceGrpc.MQDataServiceImplBase {@Overridepublic void sendRedissonMsg(AdMsgProto request, StreamObserver<ResultProto> responseObserver) {RpcServicePools.mqProducer.sendMsg(request.getMsg(), request.getTag(), request.getTopic());ResultProto.Builder builder = ResultProto.newBuilder();builder.setCode(ResultType.SUCCESS);responseObserver.onNext(builder.build());responseObserver.onCompleted();}@Overridepublic void receiveRedissonMsg(ReqDataProto request, StreamObserver<AdMsgProto> responseObserver) {try {RBlockingQueue<String> queue = RpcServicePools.redisson.getBlockingQueue(request.getName());// 循环处理消息while (!Thread.currentThread().isInterrupted()) {// 阻塞式获取消息,没有消息时线程会等待String message = queue.take();if(StringUtils.hasText(message)){AdMsgProto.Builder builder = AdMsgProto.newBuilder();builder.setMsg(message);...responseObserver.onNext(builder.build());}}} catch (Exception e) {Thread.currentThread().interrupt();  // 重新设置线程的中断标志}responseObserver.onCompleted();}
}//mq依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.15.0</version><scope>compile</scope>
</dependency>

客户端写法

 // 消息生产者private void buildAppMSg(...param) {...// 发送mq消息ClientManager.getMqDataServiceFutureStub().sendRedissonMsg(adMsg.build());}//消费者流式接收public static void receiveFacebookMsg() {try {log.info("facebook msg");// 处理服务器流式响应StreamObserver<AdMsgProto> responseObserver = new StreamObserver<AdMsgProto>() {@Overridepublic void onNext(AdMsgProto msgProto) {log.info("facebook 接收到消息: {}", msgProto.getMsg());...}@Overridepublic void onError(Throwable throwable) {log.info("Error occurred: {}", throwable.getMessage());...}@Overridepublic void onCompleted() {log.info("Stream completed.");...}};log.info("接收msg 开始");ClientManager.getMqDataServiceStub().receiveRedissonMsg(build, responseObserver);log.info("接收msg 成功");}catch (Exception e){log.info("出错了");}}~~~jedis和消息优化版:

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com