muduo库的使用
- 1. 下载安装mudu库
- 2. muduo库的原理
- 3. muduo库常见接口介绍
- 3.1 TcpServer类基础介绍
- 3.2 EventLoop类基础介绍
- 3.3TcpConnection类基础介绍
- 3.4 Buffer类基础介绍
- 3.5 TcpClient类基础介绍
- 4. 示例加深理解
- 4.1 服务端
- 4.2 客户端
- 4.3 Makefile编写
1. 下载安装mudu库
- git方法
git clone https://github.com/chenshuo/muduo.git
- 安装依赖环境
sudo apt-get install libz-dev libboost-all-dev
- 进入目录运行脚本编译安装
[……]$ ./build.sh
[……]$ ./build.sh install
此时会在同级目录下生成一个build目录
2. muduo库的原理
Muduoku
是由陈硕大佬开发的一个基于非阻塞IO和时间驱动的C++高并发TCP网络编程库。它是一款使用epoll多路转接,基于主从Reactor模型的网络库,其使用的线程模型是one loop per thread
,所谓one loop per thread
指的是:
- 一个线程只能有一个事件循环(
EventLoop)
,用于响应计时器和IO事件。 - 一个文件描述符只能由一个线程进行读写,换句话说就是一个TCP连接必须属于某个
EventLoop
管理
在我们之前我们讲解的select,poll,epoll
这些多路转接模型的工作原理都是基于单线程的事件监听。将listen_socket
添加到事件监控中,然后根据listen_socket
监听,一旦由client进行连接就创建socket
并也把这个socket
添加到事件监听中,然后就可以对已添加的socket
进行事件监控,一旦触发了某个事件就对其进行处理。这种通过多路转接模型进行事件触发的IO处理的模型就叫做Reactor模型。但是这也会带来很大的缺陷,前面我们说过了他们都是基于单线程实现的,也就是说执行流只会有一个,如果client
有大量的请求连接的话,假如同一时间有1000个请求,那么第1000个请求就需要等带前999个请求处理完后才会执行第1000个请求,而没处理一个请求时需要花费一定时间的,这个效率对用户的体验是非常差的,所以对于高并发场景的单个Reactor
模型是远远不够的。
所以muduo
库就诞生了,muduo
库的模型是由一个主Reactor
和多个子Reactor
组成的,每个Reactor
由一个线程进行维护,主Reactor
负责监听新的连接请求,并将新连接分发给子Reactor
。主Reactor
使用epoll机制监听端口,当有新的连接请求时,将其分配给某个子Reactor
。子Reactor负责处理分配给它的连接的所有IO操作。每个子Reactor
在一个独立的线程中运行,减少了线程间的上下文切换,提高了效率
3. muduo库常见接口介绍
3.1 TcpServer类基础介绍
主要代码逻辑框架
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void(const TcpConnectionPtr &)> ConnectionCallback;
typedef std::function<void(const TcpConnectionPtr &,Buffer *,Timestamp)>MessageCallback;
class InetAddress : public muduo::copyable
{
public:InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};
class TcpServer : noncopyable
{
public:enum Option{kNoReusePort,kReusePort,};TcpServer(EventLoop *loop,const InetAddress &listenAddr,const string &nameArg,Option option = kNoReusePort); // 第四个参数:是否设置为地址重用// 设置线程数量,也就是设置几个Reactor,默认是只有一个线程也就是主线程void setThreadNum(int numThreads);// 开始时间监控,获取新的连接void start();/// 当⼀个新连接建⽴成功的时候被调⽤(一个连接建立成功/断开时执行,比如一个广播聊天,一个用户上线里就通知某某上线了,当他线下了也通知某某下线了。)void setConnectionCallback(const ConnectionCallback &cb){connectionCallback_ = cb;}/// 消息的业务处理回调函数---这是收到新连接消息的时候被调⽤的函数void setMessageCallback(const MessageCallback &cb){messageCallback_ = cb;}
};
3.2 EventLoop类基础介绍
class EventLoop : noncopyable
{
public: // 开始事件循环,直到调用quit()方法为止 void loop(); // 请求退出事件循环 void quit(); // 在指定的时间运行回调函数一次 // 参数time是回调应该被调用的时间戳 // 参数cb是当时间到达时应该被调用的回调函数 // 返回TimerId,可用于取消定时器 TimerId runAt(Timestamp time, TimerCallback cb); // 在当前时间加上指定的延迟后运行回调函数一次 // 参数delay是延迟时间(秒) // 参数cb是当延迟时间过后应该被调用的回调函数 // 返回TimerId,可用于取消定时器 TimerId runAfter(double delay, TimerCallback cb); // 每隔指定的时间间隔重复运行回调函数 // 参数interval是时间间隔(秒) // 参数cb是每隔interval秒应该被调用的回调函数 // 返回TimerId,可用于取消定时器 TimerId runEvery(double interval, TimerCallback cb); // 取消指定的定时器 // 参数timerId是之前通过runAt、runAfter或runEvery方法返回的定时器ID void cancel(TimerId timerId); private: // 原子变量,用于指示事件循环是否应该退出 std::atomic<bool> quit_;// 指向Poller对象的智能指针,Poller负责轮询I/O事件std::unique_ptr<Poller> poller_; // 互斥锁,用于保护多线程访问共享数据 mutable MutexLock mutex_; // 存储待执行函数的向量,这些函数将在事件循环的某个点被执行std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};
3.3TcpConnection类基础介绍
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
{
public: // 构造函数,用于创建TcpConnection对象 // 参数包括事件循环指针、连接名称、套接字文件描述符、本地地址和远程地址 TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr); // 检查连接是否已建立 bool connected() const { return state_ == kConnected; } // 检查连接是否已断开 bool disconnected() const { return state_ == kDisconnected; } // 发送字符串消息(使用C++11的移动语义) void send(string&& message); // 发送原始数据 void send(const void* message, int len); // 使用StringPiece发送消息(StringPiece是Google的字符串切片类,用于高效处理字符串片段) void send(const StringPiece& message); // 发送Buffer对象中的数据 void send(Buffer* message); // 关闭连接 void shutdown(); // 设置连接上下文,上下文可以是任意类型的数据,通过boost::any存储 void setContext(const boost::any& context) { context_ = context; } // 获取连接上下文(常量引用) const boost::any& getContext() const { return context_; } // 获取连接上下文的可修改指针(注意:这可能不是线程安全的,使用时需要小心) boost::any* getMutableContext() { return &context_; } // 设置连接建立时的回调函数 void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } // 设置接收到消息时的回调函数 void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } private: // 连接的状态枚举 enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; // 指向事件循环的指针,用于在连接上执行定时任务或异步操作 EventLoop* loop_; // 连接建立时的回调函数 ConnectionCallback connectionCallback_; // 接收到消息时的回调函数 MessageCallback messageCallback_; // 发送完成时的回调函数WriteCompleteCallback writeCompleteCallback_; // 上下文信息,可以是任意类型的数据,通过boost::any存储 boost::any context_; // 连接的状态StateE state_; };
3.4 Buffer类基础介绍
// 缓冲区类
// Buffer 类是一个字节缓冲区类,支持从两端读写数据,以及处理整数和基本字符串。
// 它继承自 muduo::copyable,表明这个类是可以被拷贝的。
class Buffer : public muduo::copyable
{
public: // 定义了一个便宜的前置空间大小,用于优化读操作。 static const size_t kCheapPrepend = 8; // 定义了缓冲区的初始大小。 static const size_t kInitialSize = 1024; // 构造函数,接受一个可选的初始大小参数。 // 缓冲区实际大小为 kCheapPrepend + initialSize,其中 kCheapPrepend 用于优化读操作。 explicit Buffer(size_t initialSize = kInitialSize) : buffer_(kCheapPrepend + initialSize), readerIndex_(kCheapPrepend), writerIndex_(kCheapPrepend) {} // 与另一个Buffer对象交换内容。 void swap(Buffer& rhs); // 返回可读字节数,即 writerIndex_ - readerIndex_。 size_t readableBytes() const; // 返回可写字节数,即 buffer_.size() - writerIndex_。 size_t writableBytes() const; // 返回一个指向可读数据的指针,但不移动读写索引。 const char* peek() const; // 查找并返回指向缓冲区中第一个EOL(如"\r\n")的指针,从头开始搜索。 const char* findEOL() const; // 查找并返回指向缓冲区中从指定位置开始的第一个EOL的指针。 const char* findEOL(const char* start) const; // 从缓冲区中移除指定长度的数据。 void retrieve(size_t len); // 移除并返回缓冲区中下一个 int64_t 类型的数据。 void retrieveInt64(); // 移除并返回缓冲区中下一个 int32_t 类型的数据。 void retrieveInt32(); // 移除并返回缓冲区中下一个 int16_t 类型的数据。 void retrieveInt16(); // 移除并返回缓冲区中下一个 int8_t 类型的数据。 void retrieveInt8(); // 移除并返回缓冲区中所有可读数据作为字符串。 string retrieveAllAsString(); // 移除并返回缓冲区中指定长度的数据作为字符串。 string retrieveAsString(size_t len); // 向缓冲区末尾追加 StringPiece 对象。 void append(const StringPiece& str); // 向缓冲区末尾追加指定长度的数据。 void append(const char* /*restrict*/ data, size_t len); // 向缓冲区末尾追加指定长度的数据(泛型版本)。 void append(const void* /*restrict*/ data, size_t len); // 返回一个指向缓冲区末尾(用于写入)的指针。 char* beginWrite(); // 返回一个指向缓冲区末尾(用于写入)的常量指针。 const char* beginWrite() const; // 更新写入索引,表示已经写入了指定长度的数据。 void hasWritten(size_t len); // 向缓冲区末尾追加一个 int64_t 类型的数据。 void appendInt64(int64_t x); // 向缓冲区末尾追加一个 int32_t 类型的数据。 void appendInt32(int32_t x); // 向缓冲区末尾追加一个 int16_t 类型的数据。 void appendInt16(int16_t x); // 向缓冲区末尾追加一个 int8_t 类型的数据。 void appendInt8(int8_t x); // 从缓冲区读取一个 int64_t 类型的数据,并移动读索引。 int64_t readInt64(); // 从缓冲区读取一个 int32_t 类型的数据,并移动读索引。 int32_t readInt32(); // 从缓冲区读取一个 int16_t 类型的数据,并移动读索引。 int16_t readInt16(); // 从缓冲区读取一个 int8_t 类型的数据,并移动读索引。 int8_t readInt8(); // 从缓冲区中查看(不移动读索引)下一个 int64_t 类型的数据。 int64_t peekInt64() const; // 从缓冲区中查看(不移动读索引)下一个 int32_t 类型的数据。 int32_t peekInt32() const; // 从缓冲区中查看(不移动读索引)下一个 int16_t 类型的数据。 int16_t peekInt16() const; // 从缓冲区中查看(不移动读索引)下一个 int8_t 类型的数据。 int8_t peekInt8() const; // 在缓冲区开头(readerIndex_ 之前)追加一个 int64_t 类型的数据。 void prependInt64(int64_t x); // 在缓冲区开头(readerIndex_ 之前)追加一个 int32_t 类型的数据。 void prependInt32(int32_t x); // 在缓冲区开头(readerIndex_ 之前)追加一个 int16_t 类型的数据。 void prependInt16(int16_t x); // 在缓冲区开头(readerIndex_ 之前)追加一个 int8_t 类型的数据。 void prependInt8(int8_t x); // 在缓冲区开头(readerIndex_ 之前)追加指定长度的数据。 void prepend(const void* /*restrict*/ data, size_t len); private: std::vector<char> buffer_; // 存储字节数据的向量。 size_t readerIndex_; // 读索引,指向下一个可读字节的位置。 size_t writerIndex_; // 写索引,指向下一个可写字节的位置。 static const char kCRLF[]; // 可能的行结束符,如 "\r\n"。
};
3.5 TcpClient类基础介绍
class TcpClient : noncopyable
{
public: // 构造函数,用于创建 TcpClient 对象。 // 需要提供事件循环指针、服务器地址和客户端名称。 TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& nameArg); // 析构函数,声明为 out-of-line(在类定义外部实现), // 以便处理 std::unique_ptr 成员(尽管在这个类的定义中没有直接显示)。 ~TcpClient(); // 连接到服务器。 void connect(); // 关闭连接。 void disconnect(); // 停止客户端操作,可能包括关闭连接和清理资源。 void stop(); // 获取客户端对应的通信连接 TcpConnection 对象的接口。 // 注意:在发起 connect 后,连接可能尚未建立成功。 TcpConnectionPtr connection() const { MutexLockGuard lock(mutex_); // 加锁以保护 connection_ return connection_; // 返回当前连接(如果有的话) } // 设置连接成功时的回调函数。 void setConnectionCallback(ConnectionCallback cb) { connectionCallback_ = std::move(cb); // 使用移动语义设置回调 } // 设置收到服务器发送的消息时的回调函数。 void setMessageCallback(MessageCallback cb) { messageCallback_ = std::move(cb); // 使用移动语义设置回调 } private: EventLoop* loop_; // 指向事件循环的指针,用于处理异步事件 ConnectionCallback connectionCallback_; // 连接成功时的回调函数 MessageCallback messageCallback_; // 收到消息时的回调函数 // 注意:WriteCompleteCallback 在此类的定义中没有直接出现,但可能在其他地方使用 TcpConnectionPtr connection_ GUARDED_BY(mutex_); // 当前连接(受 mutex_ 保护) mutable MutexLock mutex_; // 用于保护 connection_ 的互斥锁
}; // CountDownLatch 类是一个同步辅助类,用于让一个或多个线程等待直到其他线程的一系列操作完成。
// 它继承自 noncopyable 以防止被复制。
class CountDownLatch : noncopyable
{
public: // 构造函数,初始化计数器。 explicit CountDownLatch(int count); // 等待计数器变为零。如果计数器不为零,则当前线程将阻塞。 void wait() { MutexLockGuard lock(mutex_); // 加锁以保护 count_ 和 condition_ while (count_ > 0) // 如果计数器大于零,则等待 { condition_.wait(); // 释放锁并进入等待状态,直到被唤醒 } } // 将计数器减一。如果计数器变为零,则唤醒所有等待的线程。 void countDown() { MutexLockGuard lock(mutex_); // 加锁以保护 count_ 和 condition_ --count_; // 计数器减一 if (count_ == 0) // 如果计数器变为零 { condition_.notifyAll(); // 唤醒所有等待的线程 } } // 获取当前计数器的值(主要用于调试)。 int getCount() const; private: mutable MutexLock mutex_; // 用于保护 count_ 和 condition_ 的互斥锁 Condition condition_ GUARDED_BY(mutex_); // 条件变量,与 mutex_ 一起使用以实现等待/通知机制 int count_ GUARDED_BY(mutex_); // 计数器,表示需要等待的操作数量
};
4. 示例加深理解
编写一个简单的翻译模块。
4.1 服务端
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>class DictServer{
public:DictServer(int port = 9090):_server(&_baseloop, muduo::net::InetAddress("127.0.0.1",port), "DictServer", muduo::net::TcpServer::kReusePort)//第四个参数:是否启动地址重用{// 设置回调函数// 设置连接(连接建立/管理)事件的回调_server.setConnectionCallback(std::bind(&DictServer::onConnection, this, std::placeholders::_1));// 设置连接消息的回调_server.setMessageCallback(std::bind(&DictServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}// 启动服务器void start(){_server.start();// 开始监听_baseloop.loop();// 开始事件循环监控}void onConnection(const muduo::net::TcpConnectionPtr& conn){if (conn->connected()){std::cout << "连接成功\n";}else{std::cout << "连接失败\n";}}void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp time){static std::unordered_map<std::string, std::string> dict_map = {{"hello", "你好"},{"world", "世界"},{"map","图"}};// 获取所有数据,不需要提供rceve接收消息的接口,直接从缓冲区中拿数据std::string msg = buf->retrieveAllAsString();auto it = dict_map.find(msg);std::string res;if (it != dict_map.end()){res = it->second;}else{res = "未知单词!";}conn->send(res);}
private:// 事件循环对象,用户处理网络事件muduo::net::EventLoop _baseloop; // 这个一定要声明在_server前面,因为需要用EventLoop来构造TcpServer // TCP服务对象,用于监听和接收连接muduo::net::TcpServer _server;
};int main()
{DictServer server(9090);server.start();return 0;
}
4.2 客户端
#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <muduo/base/CountDownLatch.h> // 用来同步连接,即连接成功后才开始发消息
#include <iostream>
#include <string>class DictClient
{
public:DictClient(const std::string& sip, int port) : _baseloop(_threadloop.startLoop()), // 这里threadloop返回的是一个EventLoop类型_client(_baseloop, muduo::net::InetAddress(sip, port), "DictClient"),_downlatch(1) // 这里初始化为1,为0时才会被唤醒{// 设置回调函数// 设置连接(连接建立/管理)事件的回调_client.setConnectionCallback(std::bind(&DictClient::onConnection, this, std::placeholders::_1));// 设置连接消息的回调_client.setMessageCallback(std::bind(&DictClient::onMessage,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_client.connect();_downlatch.wait(); // 阻塞等待连接成功后被唤醒//_baseloop.loop(); // 客户端不能这样写,因为loop是一个死循环,一旦这里执行了loop就一定不会走到send发送数据那一块// 所以这里要给baseloop.loop在一个新的线程里跑,muduo库提供了一个EventLoopThread可以做到// 此时我们就不需要使用_baseloop.loop();循环了EventLoopThread里面会帮我们去loop、}bool send(const std::string &msg){if (_conn->connected() == false){return false;std::cout << "连接已断开,发送数据失败" << std::endl;}_conn->send(msg);return true;}void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){std::cout << "连接成功\n";_downlatch.countDown(); // 连接成功计数器--,为0时唤醒阻塞_conn = conn;}else{std::cout << "连接失败\n";_conn.reset(); // 清空}}void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp time){std::string res = buf->retrieveAllAsString();std::cout << res << std::endl;}private:muduo::net::TcpConnectionPtr _conn; // TcpConnection的智能指针,用于存放连接对象muduo::CountDownLatch _downlatch; // 计数器用于实现同步muduo::net::EventLoopThread _threadloop; // 定义这个的时候EventLoopThread 内部执行startloop便可以触发EventLoop 的loop事件循环,muduo::net::EventLoop *_baseloop;指向EventLoopThread中EventLoop指针muduo::net::TcpClient _client; // TcpClient对象,用于连接和发送数据
};int main()
{DictClient client("127.0.0.1", 9090);while (true){std::string msg;std::cin >> msg;client.send(msg);}return 0;
}
这里我们可以跳转到startLoop的实现:
EventLoop* EventLoopThread::startLoop()
{assert(!thread_.started());thread_.start();EventLoop* loop = NULL;{MutexLockGuard lock(mutex_);while (loop_ == NULL){cond_.wait();}loop = loop_;}return loop;
}
在跳转到thread_.start()
void Thread::start()
{assert(!started_);started_ = true;// FIXME: move(func_)detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)) // 这里就会创建一个线程,这个线程实现的方法就是EventLoop的loop事件循环监控行为{started_ = false;delete data; // or no delete?LOG_SYSFATAL << "Failed in pthread_create";}else{latch_.wait();assert(tid_ > 0);}
}
void* startThread(void* obj)
{ThreadData* data = static_cast<ThreadData*>(obj);data->runInThread();delete data;return NULL;
}void runInThread()
{……func_();……
};这个func_()是一个回调函数,我们可以去看一下EventLoopThread的构造函数,构造了一个
void EventLoopThread::threadFunc()
{
……EventLoop loop;loop.loop(); // 这个就是
……
}
4.3 Makefile编写
CFLAG = -std=c++11 -I ../../build/release-install-cpp11/include/ #指定头文件路径(这里需要根据自身的情况具体填写)
LFLAG = -L ../../build/release-install-cpp11/lib -lmuduo_net -lmuduo_base -lpthread #指定包含库文件路径,注意这里lmuduo_net要先连接(这里需要根据自身的情况具体填写)
all:server client
server:server.cppg++ -o $@ $^ $(CFLAG) $(LFLAG)client:client.cppg++ -o $@ $^ $(CFLAG) $(LFLAG)
.PHONY:clean
clean:rm -fr server client