文章目录
- 一.实现要点
- 服务接口返回和收到响应的同步
- 二.代码实现
信道是提供服务的窗口,服务端的信道给用户提供服务
一.实现要点
服务接口返回和收到响应的同步
信道提供的服务接口主要任务就是,根据用户传入的参数,构建请求,然后发送,并等接收到响应后再返回。
关键 muduo 库中的接口都是非阻塞的,send 一调立马就返回,而我们是想接收到相应的响应后再返回,所以这里的同步需要我们自己实现。
我的做法是,信道内维护一个哈希表,key 是响应的 id,value 是响应。这个哈希表哪些地方会访问呢?第一,发送完请求后,会访问哈希表判断是否存在对应的响应(用请求的 id 来判断,因为请求和对应的响应,它们的 id 是相同的),如果存在了就直接返回响应,如果不能存在就去一个条件变量上等待。第二,收到来自服务端的推送消息的响应时,会将响应添加到哈希表,然后唤醒在条件变量上等待的线程
二.代码实现
#pragma once
#include "Consumer.hpp"
#include "muduo/net/TcpConnection.h"
#include "muduo/protobuf/codec.h"
#include "../common/ThreadPool.hpp"
#include "../common/protocol.pb.h"
#include "../common/Util.hpp"
#include "../common/message.pb.h"
#include <mutex>
#include <condition_variable>namespace ns_channel
{class Channel;using ChannelPtr = std::shared_ptr<Channel>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using CommonResponsePtr = std::shared_ptr<ns_protocol::CommomResponse>;struct Channel{std::string _id;std::unordered_map<std::string, ns_consumer::ConsumerPtr> _consumers; //<队列名,消费者>muduo::net::TcpConnectionPtr _connPtr;ProtobufCodecPtr _codecPtr; // 构建响应后要添加协议数据std::mutex _mtxForResps;std::mutex _mtxForConsumers;std::condition_variable _cond;std::unordered_map<std::string, CommonResponsePtr> _resps;Channel(const std::string &id, const muduo::net::TcpConnectionPtr &connPtr,const ProtobufCodecPtr &codecPtr): _id(id),_consumers(),_connPtr(connPtr),_codecPtr(codecPtr),_mtxForResps(),_mtxForConsumers(),_cond(),_resps(){}// 以下两个接口是为了给服务端发送请求,因为Connection模块不想再重复设计同步机制,所以发送响应的动作由信道模块完成bool openChannel(){ns_protocol::OpenChannelRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());_codecPtr->send(_connPtr, req);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "打开信道:channelId: " << _id << endl;return commonRespPtr->ok();}bool closeChannel(){ns_protocol::CloseChannelRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());_codecPtr->send(_connPtr, req);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "关闭信道:channelId: " << _id << endl;return commonRespPtr->ok();}/************* 以下用于处理生产客户端的请求* ***********/bool declareExchange(const std::string &exchangeName, ns_protocol::ExchangeType type, bool isDurable){ns_protocol::DeclareExchangeRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());req.set_exchange_name(exchangeName);req.set_exchange_type(type);req.set_is_durable(isDurable);_codecPtr->send(_connPtr, req);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "声明交换机, exchangeName: " << exchangeName << endl;return commonRespPtr->ok();}bool deleteExchange(const std::string &exchangeName){ns_protocol::DeleteExchangeRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());_codecPtr->send(_connPtr, req);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "删除信道, exchangeName: " << exchangeName << endl;return commonRespPtr->ok();}/************** 声明队列* 记得要初始化队列消费者管理句柄* ***********/bool declareMsgQueue(const std::string &qname, bool isDurable){ns_protocol::DeclareMsgQueueRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());req.set_queue_name(qname);req.set_is_durable(isDurable);_codecPtr->send(_connPtr, req);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "声明队列, queueName: " << qname << endl;return commonRespPtr->ok();}/**************** 删除队列* 记得要删除队列关联的消费者* *************/bool deleteMsgQueue(const std::string &qname){ns_protocol::DeleteMsgQueueRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());req.set_queue_name(qname);_codecPtr->send(_connPtr, req);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "删除队列, queueName: " << qname << endl;return commonRespPtr->ok();}/********** * 绑定与解绑* ************/bool bind(const std::string &ename, const std::string &qname, const std::string &bindingKey){ns_protocol::BindRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());req.set_qname(qname);req.set_ename(ename);req.set_binding_key(bindingKey);_codecPtr->send(_connPtr, req);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "绑定: " << ename << "->" << qname << endl;return commonRespPtr->ok();}bool unbind(const std::string &ename, const std::string &qname){ns_protocol::UnbindRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());req.set_qname(qname);req.set_ename(ename);_codecPtr->send(_connPtr, req);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "解绑: " << ename << "->" << qname << endl;return commonRespPtr->ok();}bool publishMessage(const std::string &ename, const std::string &routingKey,ns_data::DeliveryMode mode, const std::string &body){ns_protocol::PublishMessageRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());req.set_exchange_name(ename);ns_data::Message msg;msg.mutable_saved_info()->set_id(ns_util::UUIDUtil::uuid());msg.mutable_saved_info()->set_routing_key(routingKey);msg.mutable_saved_info()->set_delivery_mode(mode);msg.mutable_saved_info()->set_body(body);req.mutable_msg()->Swap(&msg);_codecPtr->send(_connPtr, req);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "publish message: " << body << endl;return commonRespPtr->ok();}/************ 以下用于处理消费客户端请求* **************/bool subscribeQueue(const std::string &qname, bool autoAck, ns_consumer::ConsumerCallback_t callback){std::unique_lock<std::mutex> lck(_mtxForConsumers);if (_consumers.count(qname)){LOG(INFO) << "this queue has been subscribed, qname: " << qname;return true;}ns_protocol::SubscribeQueueRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());req.set_qname(qname);req.set_consumer_id(ns_util::UUIDUtil::uuid());req.set_auto_ack(autoAck);_consumers[req.qname()] = std::make_shared<ns_consumer::Consumer>(req.consumer_id(),qname, callback, autoAck);_codecPtr->send(_connPtr, req);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "订阅队列" << ", qname: " << qname << endl;return commonRespPtr->ok();}bool cancelSubscribe(const std::string &qname){std::unique_lock<std::mutex> lck(_mtxForConsumers);if (_consumers.count(qname) == 0){return true;}auto consumerPtr = _consumers[qname];ns_protocol::CancelSubscribeRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());req.set_qname(qname);req.set_consumer_id(consumerPtr->_id);_codecPtr->send(_connPtr, req);_consumers.erase(qname);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "取消订阅队列" << ", qname: " << qname << endl;return commonRespPtr->ok();}bool ackMessage(const std::string &qname, const std::string &msgId){ns_protocol::AckRequest req;req.set_channel_id(_id);req.set_request_id(ns_util::UUIDUtil::uuid());req.set_qname(qname);req.set_msg_id(msgId);_codecPtr->send(_connPtr, req);auto commonRespPtr = waitCommonResponse(req.request_id());LOG(DEBUG) << "应答消息, msgId: " << msgId << endl;return commonRespPtr->ok();}// 我们想要收到响应后这些给用户提供服务的接口才返回,所以需要同步策略void putCommonResponse(const CommonResponsePtr &respPtr){std::unique_lock<std::mutex> lck(_mtxForResps);_resps[respPtr->response_id()] = respPtr;_cond.notify_all();}// 让消费者处理消息void consumeMessage(const std::string qname, const ns_data::Message &msg){{std::unique_lock<std::mutex> lck(_mtxForConsumers);if (_consumers.count(qname) == 0){LOG(WARNING) << "该消费者不存在" << endl;return;}}_consumers[qname]->_callback(msg);}private:CommonResponsePtr waitCommonResponse(const std::string &reqId){std::unique_lock<std::mutex> lck(_mtxForResps);while (_resps.count(reqId) == 0){_cond.wait(lck);}auto commonRespPtr = _resps[reqId];_resps.erase(reqId);return commonRespPtr;}};/******************************* 信道管理句柄,注意以Connection为单元进行管理* ***************************/class ChannelManager{private:std::mutex _mtx;std::unordered_map<std::string, ChannelPtr> _channels;public:ChannelPtr openChannel(const muduo::net::TcpConnectionPtr &connPtr,const ProtobufCodecPtr &codecPtr){std::unique_lock<std::mutex> lck(_mtx);std::string channelId = ns_util::UUIDUtil::uuid();auto channelPtr = std::make_shared<Channel>(channelId, connPtr, codecPtr);_channels[channelId] = channelPtr;return channelPtr;}void closeChannel(const std::string &id){std::unique_lock<std::mutex> lck(_mtx);_channels.erase(id);}ChannelPtr getChannel(const std::string &id){std::unique_lock<std::mutex> lck(_mtx);if (_channels.count(id) == 0){LOG(WARNING) << "信道不存在, channelId: " << id;return nullptr;}return _channels[id];}};
}