以下是一个结合现代C++特性和工业级最佳实践的线程池实现,支持任务返回值、优雅关闭、异常处理和高效资源管理
1
4
6
:
cpp
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <memory>class ThreadPool {
public:explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()): stop(false){if(threads == 0) throw std::invalid_argument("Thread count cannot be zero");workers.reserve(threads);for(size_t i = 0; i < threads; ++i) {workers.emplace_back([this] {while(true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(queue_mutex);condition.wait(lock, [this] { return stop || !tasks.empty(); });if(stop && tasks.empty()) return;task = std::move(tasks.front());tasks.pop();}try {task();} catch(...) {// 异常处理策略std::exception_ptr eptr = std::current_exception();// 可在此记录日志或执行其他处理}}});}}template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::invoke_result_t<F, Args...>>{using return_type = typename std::invoke_result_t<F, Args...>;if(stop) throw std::runtime_error("Enqueue on stopped ThreadPool");auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;}~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for(std::thread &worker : workers) {if(worker.joinable()) worker.join();}}private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queue_mutex;std::condition_variable condition;bool stop;
};
核心特性说明
1
4
6
:
-
自动硬件并发检测
- 默认使用
std::thread::hardware_concurrency()
自动检测CPU核心数 - 可手动指定线程数量,支持动态调整(需扩展)
- 默认使用
-
完美转发任务参数
cpp
template<class F, class... Args> auto enqueue(F&& f, Args&&... args)
- 支持任意可调用对象(函数、lambda、仿函数等)
- 使用
std::bind
保持参数类型完整性
-
异步返回值处理
- 通过
std::packaged_task
和std::future
实现
cpp
auto res = task->get_future();
- 允许调用者通过future获取任务执行结果
- 通过
-
异常安全设计
- 任务执行时的异常捕获机制
cpp
try {task(); } catch(...) {std::exception_ptr eptr = std::current_exception(); }
- 可扩展添加异常日志记录或自定义处理逻辑
-
优雅关闭机制
cpp
~ThreadPool() {stop = true;condition.notify_all();// ...等待线程结束 }
- 保证所有已入队任务执行完毕
- 避免资源泄漏的严格线程join机制
使用示例
6
:
cpp
int main() {ThreadPool pool(4); // 4个工作线程std::vector<std::future<int>> results;// 提交任务for(int i = 0; i < 8; ++i) {results.emplace_back(pool.enqueue([i] {std::this_thread::sleep_for(std::chrono::seconds(1));return i*i;}));}// 获取结果for(auto&& result : results)std::cout << result.get() << ' ';return 0;
}
高级扩展方向
4
7
:
-
动态线程调整
可根据队列负载动态增加/减少工作线程 -
优先级队列
使用优先队列实现任务优先级调度 -
任务超时机制
为任务添加超时检测和自动取消功能 -
工作窃取算法
实现work-stealing机制提升负载均衡 -
监控统计接口
添加任务计数、等待时间等监控指标
该实现符合C++11/14/17标准,通过RAII确保资源安全,使用现代模板技术实现类型安全,可作为生产环境基础框架。建议根据具体业务需求进行扩展优化
2
6
。+