文章目录
- 先定义具体的业务请求类型
- 2. 实现服务端提供的服务
- protobuf_server.cpp
- protobuf_client.cpp
建议先去了解muduo库和protobuf协议:
- Protobuf库的使用
- Muduo库介绍及使用
先定义具体的业务请求类型
先使用protobuf库创建我们所要完成的业务请求类型,英译汉和加法服务器和客⼾端。
创建request.proto
syntax = "proto3";
package nzq;
//接下来定义rpc翻译请求信息结构
message TranslateRequest {string msg = 1;
}
//接下来定义rpc翻译响应信息结构
message TranslateResponse {string msg = 1;
}
//定义rpc加法请求信息结构
message AddRequest {uint32 num1 = 1;uint32 num2 = 2;
}
//定义rpc加法响应信息结构
message AddResponse {uint32 result = 1;
}
protoc --cpp_out=. request.proto
2. 实现服务端提供的服务
在实现具体服务前,先介绍⼀下muduo库中内部实现的关于简单的基于protobuf的接⼝类
ProtobufCodec类是muduo库中对于protobuf协议的处理类,其内部实现了onMessage回调接,对于接收到的数据进⾏基于protobuf协议的请求处理,然后将解析出的信息,存放到对应请求的protobuf请求类对象中,然后最终调⽤设置进去的消息处理回调函数进⾏对应请求的处理。
/*muduo-master/examples/protobuf/codec.h*/
typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
//
// FIXME: merge with RpcCodec
//
class ProtobufCodec : muduo::noncopyable
{
public:enum ErrorCode{kNoError = 0,kInvalidLength,kCheckSumError,kInvalidNameLen,kUnknownMessageType,kParseError,};typedef std::function<void(const muduo::net::TcpConnectionPtr &,const MessagePtr &,muduo::Timestamp)>ProtobufMessageCallback;// 这⾥的messageCb是针对protobuf请求进⾏处理的函数,它声明在dispatcher.h中的ProtobufDispatcher类 explicit ProtobufCodec(const ProtobufMessageCallback &messageCb): messageCallback_(messageCb), // 这就是设置的请求处理回调函数errorCallback_(defaultErrorCallback){}// 它的功能就是接收消息,进⾏解析,得到了proto中定义的请求后调⽤设置的messageCallback_进⾏处理void onMessage(const muduo::net::TcpConnectionPtr &conn,muduo::net::Buffer *buf,muduo::Timestamp receiveTime);// 通过conn对象发送响应的接⼝void send(const muduo::net::TcpConnectionPtr &conn,const google::protobuf::Message &message){// FIXME: serialize to TcpConnection::outputBuffer()muduo::net::Buffer buf;fillEmptyBuffer(&buf, message);conn->send(&buf);}static const muduo::string &errorCodeToString(ErrorCode errorCode);static void fillEmptyBuffer(muduo::net::Buffer *buf, const google::protobuf::Message &message);static google::protobuf::Message *createMessage(const std::string &type_name);static MessagePtr parse(const char *buf, int len, ErrorCode *errorCode);private:static void defaultErrorCallback(const muduo::net::TcpConnectionPtr &,muduo::net::Buffer *,muduo::Timestamp,ErrorCode);ProtobufMessageCallback messageCallback_;ErrorCallback errorCallback_;const static int kHeaderLen = sizeof(int32_t);const static int kMinMessageLen = 2 * kHeaderLen + 2; // nameLen + typeName +checkSum const static int kMaxMessageLen = 64 * 1024 * 1024; // same as codec_stream.hkDefaultTotalBytesLimit
};
};
ProtobufDispatcher类,这个类就⽐较重要了,这是⼀个protobuf请求的分发处理类,我们⽤⼾在使⽤的时候,就是在这个类对象中注册哪个请求应该⽤哪个业务函数进⾏处理。
它内部的onProtobufMessage接⼝就是给上边ProtobufCodec::messageCallback_设置的回调函数,相当于ProtobufCodec中onMessage接⼝会设置给服务器作为消息回调函数,其内部对于接收到的数据进⾏基于protobuf协议的解析,得到请求后,通过ProtobufDispatcher::onProtobufMessage接⼝进⾏请求分发处理,也就是确定当前请求应该⽤哪⼀个注册的业务函数进⾏处理。
typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
class Callback : muduo::noncopyable
{
public:virtual ~Callback() = default;virtual void onMessage(const muduo::net::TcpConnectionPtr &,const MessagePtr &message,muduo::Timestamp) const = 0;
};
// 这是⼀个对函数接⼝进⾏⼆次封装⽣成⼀个统⼀类型对象的类
template <typename T>
class CallbackT : public Callback
{static_assert(std::is_base_of<google::protobuf::Message, T>::value,"T must be derived from gpb::Message.");public:typedef std::function<void(const muduo::net::TcpConnectionPtr &,const std::shared_ptr<T> &message,muduo::Timestamp)>ProtobufMessageTCallback;CallbackT(const ProtobufMessageTCallback &callback): callback_(callback){}void onMessage(const muduo::net::TcpConnectionPtr &conn,const MessagePtr &message,muduo::Timestamp receiveTime) const override{std::shared_ptr<T> concrete = muduo::down_pointer_cast<T>(message);assert(concrete != NULL);callback_(conn, concrete, receiveTime);}private:ProtobufMessageTCallback callback_;
};
// 这是⼀个protobuf请求分发器类,需要⽤⼾注册不同请求的不同处理函数,
// 注册完毕后,服务器收到指定请求就会使⽤对应接⼝进⾏处理
class ProtobufDispatcher
{
public:typedef std::function<void(const muduo::net::TcpConnectionPtr &,const MessagePtr &message,muduo::Timestamp)>ProtobufMessageCallback;// 构造对象时需要传⼊⼀个默认的业务处理函数,以便于找不到对应请求的处理函数时调⽤。explicit ProtobufDispatcher(const ProtobufMessageCallback &defaultCb): defaultCallback_(defaultCb){}// 这个是⼈家实现的针对proto中定义的类型请求进⾏处理的函数,内部会调⽤我们⾃⼰传⼊的业务处理函数void onProtobufMessage(const muduo::net::TcpConnectionPtr &conn,const MessagePtr &message,muduo::Timestamp receiveTime) const{CallbackMap::const_iterator it = callbacks_.find(message->GetDescriptor());if (it != callbacks_.end()){it->second->onMessage(conn, message, receiveTime);}else{defaultCallback_(conn, message, receiveTime);}}/*这个接⼝⾮常巧妙,基于proto中的请求类型将我们⾃⼰的业务处理函数与对应的请求给关联起来了相当于通过这个成员变量中的CallbackMap能够知道收到什么请求后应该⽤什么处理函数进⾏处理简单理解就是注册针对哪种请求--应该⽤哪个我们⾃⼰的函数进⾏处理的映射关系但是我们⾃⼰实现的函数中,参数类型都是不⼀样的⽐如翻译有翻译的请求类型,加法有加法请求类型⽽map需要统⼀的类型,这样就不好整了,所以⽤CallbackT对我们传⼊的接⼝进⾏了⼆次封装。*/template <typename T>void registerMessageCallback(const typename CallbackT<T>::ProtobufMessageTCallback &callback){std::shared_ptr<CallbackT<T>> pd(new CallbackT<T>(callback));callbacks_[T::descriptor()] = pd;}private:typedef std::map<const google::protobuf::Descriptor *,std::shared_ptr<Callback>>CallbackMap;CallbackMap callbacks_;ProtobufMessageCallback defaultCallback_;
};
⽽能实现请求与函数之间的映射,还有⼀个⾮常重要的元素:那就是应⽤层协议
protobuf根据我们的proto⽂件⽣成的代码中,会⽣成对应类型的类,⽐如TranslateRequest对应了⼀个TranslateRequest类,⽽不仅仅如此,protobuf⽐我们想象中做的事情更多,每个对应的类中,都包含有⼀个描述结构的指针:
这个描述结构⾮常重要,其内部可以获取到当前对应类类型名称,以及各项成员的名称,因此通过这些名称,加上协议中的typename字段,就可以实现完美的对应关系了.
protobuf_server.cpp
服务端同之前实现的muduo库的翻译服务器区别不大,加上protobuf协议后实际上就多了两个类成员:
- 请求分发器对象–要向其中注册请求处理函数 ProtobufDispatcher _dispatcher;
- protobuf协议处理器–针对收到的请求数据进行protobuf协议处理 rotobufCodec _codec;
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"#include "request.pb.h"
#include <iostream>
#include <unordered_map>class Server {public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr;typedef std::shared_ptr<nzq::TranslateRequest> TranslateRequestPtr;typedef std::shared_ptr<nzq::AddRequest> AddRequestPtr;Server(int port): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&Server::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){//注册业务请求处理函数//messagecallback有两个是因为要完成翻译和加法两个业务_dispatcher.registerMessageCallback<nzq::TranslateRequest>(std::bind(&Server::onTranslate, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<nzq::AddRequest>(std::bind(&Server::onAdd, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));}void start() {_server.start();_baseloop.loop();}private:std::string translate(const std::string &str) {static std::unordered_map<std::string, std::string> dict_map = {{"hello", "你好"},{"Hello", "你好"},{"你好", "Hello"},{"吃了吗", "油泼面"}};auto it = dict_map.find(str);if (it == dict_map.end()) {return "没听懂!!";}return it->second;}void onTranslate(const muduo::net::TcpConnectionPtr& conn, const TranslateRequestPtr& message, muduo::Timestamp) {//1. 提取message中的有效消息,也就是需要翻译的内容std::string req_msg = message->msg();//2. 进行翻译,得到结果std::string rsp_msg = translate(req_msg);//3. 组织protobuf的响应nzq::TranslateResponse resp;resp.set_msg(rsp_msg);//4. 发送响应_codec.send(conn, resp);}void onAdd(const muduo::net::TcpConnectionPtr& conn, const AddRequestPtr& message, muduo::Timestamp) {int num1 = message->num1();int num2 = message->num2();int result = num1 + num2;nzq::AddResponse resp;resp.set_result(result);_codec.send(conn, resp);}void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr &conn) {if (conn->connected()) {LOG_INFO << "新连接建立成功!";}else {LOG_INFO << "连接即将关闭!";}}private:muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;//服务器对象ProtobufDispatcher _dispatcher;//请求分发器对象--要向其中注册请求处理函数ProtobufCodec _codec;//protobuf协议处理器--针对收到的请求数据进行protobuf协议处理
};int main()
{Server server(8085);server.start();return 0;
}
protobuf_client.cpp
服务端同之前实现的muduo库的翻译服务器区别不大,加上protobuf协议后实际上就多了两个类成员:
- 请求分发器对象–要向其中注册请求处理函数 ProtobufDispatcher _dispatcher;
- protobuf协议处理器–针对收到的请求数据进行protobuf协议处理 rotobufCodec _codec;
#include "muduo/proto/dispatcher.h"
#include "muduo/proto/codec.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"#include "request.pb.h"
#include <iostream>class Client {public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr;typedef std::shared_ptr<nzq::AddResponse> AddResponsePtr;typedef std::shared_ptr<nzq::TranslateResponse> TranslateResponsePtr;Client(const std::string &sip, int sport):_latch(1), _client(_loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"),_dispatcher(std::bind(&Client::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){_dispatcher.registerMessageCallback<nzq::TranslateResponse>(std::bind(&Client::onTranslate, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<nzq::AddResponse>(std::bind(&Client::onAdd, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Client::onConnection, this, std::placeholders::_1)); }void connect() {_client.connect();_latch.wait();//阻塞等待,直到连接建立成功}void Translate(const std::string &msg){bit::TranslateRequest req;req.set_msg(msg);send(&req);}void Add(int num1, int num2) {bit::AddRequest req;req.set_num1(num1);req.set_num2(num2);send(&req);}private:bool send(const google::protobuf::Message *message) {if (_conn->connected()) {//连接状态正常,再发送,否则就返回false_codec.send(_conn, *message);return true;}return false;} void onTranslate(const muduo::net::TcpConnectionPtr& conn, const TranslateResponsePtr& message, muduo::Timestamp) {std::cout << "翻译结果:" << message->msg() << std::endl;}void onAdd(const muduo::net::TcpConnectionPtr& conn, const AddResponsePtr& message, muduo::Timestamp) {std::cout << "加法结果:" << message->result() << std::endl;}void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr&conn){if (conn->connected()) {_latch.countDown();//唤醒主线程中的阻塞_conn = conn;}else {//连接关闭时的操作_conn.reset();}}private:muduo::CountDownLatch _latch;//实现同步的muduo::net::EventLoopThread _loopthread;//异步循环处理线程muduo::net::TcpConnectionPtr _conn;//客户端对应的连接muduo::net::TcpClient _client;//客户端ProtobufDispatcher _dispatcher;//请求分发器ProtobufCodec _codec;//协议处理器
};int main()
{Client client("127.0.0.1", 8085);client.connect();client.Translate("hello");client.Add(11, 22);sleep(1);return 0;
}