使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)
gRpc协议类定义
message AdMsgProto{
optional string msg=1;
optional string tag=2;
optional string topic=3;
}
2. service方法定义
service MQDataService{
rpc sendRedissonMsg(AdMsgProto)returns (Code);
rpc receiveRedissonMsg(String)returns (stream AdMsgProto);
}
服务端写法
package com.mykkhw.mykkhw_data_mq.service.grpc;import com.mykkhw.mykkhw_data_protocols.Base.ReqDataProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultType;
import com.mykkhw.mykkhw_data_protocols.MQ.TiktokMsgProto;
import com.mykkhw.mykkhw_data_protocols.MQService.MQDataServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.redisson.api.RBlockingQueue;
import org.springframework.util.StringUtils;public class MqRpcService extends MQDataServiceGrpc.MQDataServiceImplBase {@Overridepublic void sendRedissonMsg(AdMsgProto request, StreamObserver<ResultProto> responseObserver) {RpcServicePools.mqProducer.sendMsg(request.getMsg(), request.getTag(), request.getTopic());ResultProto.Builder builder = ResultProto.newBuilder();builder.setCode(ResultType.SUCCESS);responseObserver.onNext(builder.build());responseObserver.onCompleted();}@Overridepublic void receiveRedissonMsg(ReqDataProto request, StreamObserver<AdMsgProto> responseObserver) {try {RBlockingQueue<String> queue = RpcServicePools.redisson.getBlockingQueue(request.getName());// 循环处理消息while (!Thread.currentThread().isInterrupted()) {// 阻塞式获取消息,没有消息时线程会等待String message = queue.take();if(StringUtils.hasText(message)){AdMsgProto.Builder builder = AdMsgProto.newBuilder();builder.setMsg(message);...responseObserver.onNext(builder.build());}}} catch (Exception e) {Thread.currentThread().interrupt(); // 重新设置线程的中断标志}responseObserver.onCompleted();}
}//mq依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.15.0</version><scope>compile</scope>
</dependency>
客户端写法
// 消息生产者private void buildAppMSg(...param) {...// 发送mq消息ClientManager.getMqDataServiceFutureStub().sendRedissonMsg(adMsg.build());}//消费者流式接收public static void receiveFacebookMsg() {try {log.info("facebook msg");// 处理服务器流式响应StreamObserver<AdMsgProto> responseObserver = new StreamObserver<AdMsgProto>() {@Overridepublic void onNext(AdMsgProto msgProto) {log.info("facebook 接收到消息: {}", msgProto.getMsg());...}@Overridepublic void onError(Throwable throwable) {log.info("Error occurred: {}", throwable.getMessage());...}@Overridepublic void onCompleted() {log.info("Stream completed.");...}};log.info("接收msg 开始");ClientManager.getMqDataServiceStub().receiveRedissonMsg(build, responseObserver);log.info("接收msg 成功");}catch (Exception e){log.info("出错了");}}~~~jedis和消息优化版: