代码仓库

线程池基础

在处理大量并发任务时,传统逐任务创建线程的方式会产生大量线程创建与销毁的系统资源开销,同时增加线程上下文切换的额外损耗,线程池技术正是为缓解这类问题而生。线程池的基础运行逻辑是在系统内部预先创建一定数量的线程,当任务请求传入时,直接从池中调配已就绪线程处理任务;线程完成当前任务后不会被销毁,而是重新回归线程池待命,持续承接后续任务,实现线程的循环复用。

1

线程池具备多项实用特性,可适配多场景并发任务处理需求:其一为线程复用,通过维护固定数量的就绪线程重复执行任务,规避频繁创建销毁线程带来的资源损耗;其二是并发规模管控,针对多核处理器环境,可限定同时运行的线程数量,避免资源过度占用与线程竞争导致的运行效率下降;其三为任务排队机制,线程全量占用时,新任务会存入任务队列等待,队列可选择有界或无界类型,适配不同调度需求;其四是提升任务响应速度,任务到达后无需等待线程创建,可直接分配执行;其五支持线程状态管理,可设置线程属性、追踪线程运行状态、统计任务执行情况;其六实现系统资源管控,结合线程数量限制与队列机制,负载过高时可按配置调整线程数量,维持系统稳定运行;其七具备任务执行灵活性,可承接不同类型任务,支持普通提交、定时执行、周期性执行等多种调度方式。

借助C++11实现线程池,需要掌握多项核心相关知识:涵盖多线程基础理论,包括线程基础、线程同步、线程互斥、原子操作等内容;熟悉C++11标准并发库,涉及thread、mutex、condition_variable、atomic、unique_lock等核心组件;掌握C++11完美转发、lambda表达式的用法;熟练运用C++11智能指针;了解C++ STL容器库的基础操作。线程池适配主流两类开发环境,Windows平台可使用VS2019,Linux平台需搭配支持C++11及以上版本的g++编译器。

常见分类

线程池可按照应用场景划分为五种常见类型,各类线程池的运行逻辑、调度规则与适配场景各有差异,实际使用中可根据任务特性选择对应类型,最大化发挥并发效率。

  • 固定线程池:此类线程池的线程数量保持固定,线程创建后长期存活,不会随任务数量增减动态调整,超出线程承载能力的任务会存入队列等待。它适合任务量相对固定、单任务耗时较长的场景,能够稳定管控并发线程规模,避免系统资源波动,保障运行稳定性。
  • 缓存线程池:线程数量支持动态灵活调整,新任务到达时,优先复用空闲线程;若无空闲线程,则新建线程加入池中;线程闲置超过默认60秒后,会被自动移除并销毁,释放闲置资源。它适合任务提交量大、单任务耗时较短的场景,可快速响应突发任务,闲置线程回收能减少长期无效资源占用,提升资源利用率。
  • 单线程池:仅启用一个工作线程处理所有任务,可严格保障任务按照指定顺序执行,常见执行顺序包括先进先出、后进先出、优先级排序等。它适合需要严格保证任务执行次序的场景,彻底避免多线程并发导致的执行顺序混乱,确保任务执行逻辑可控。
  • 工作窃取线程池:内置多个独立任务队列,可创建足量线程匹配系统并行级别,通过工作窃取机制提升多核CPU利用率,避免处理器闲置。当前线程完成自身队列任务后,可从全局队列或其他线程的任务队列中调取任务执行,实现多线程负载均衡,充分发挥多核处理器的并行运算能力。
  • 计划线程池:也称作定时线程池或调度线程池,核心支持定时执行与周期性执行任务,可灵活设定任务执行的时间间隔或具体执行时间点。它适合需要定期执行的任务场景,比如定时数据备份、日志清理、定时邮件发送、系统心跳检测等常规运维类任务。

运行模式

线程池的运行主要分为两种经典模式,二者实现逻辑、调度方式与适用场景差异明显,可根据项目并发需求与业务特性选择适配模式。

领导者跟随者模式

领导者跟随者模式是线程池的经典运行模式之一,核心围绕线程状态划分与有序调度展开,全程维持线程状态的稳定切换,避免多线程同时抢占事件资源,有效减少线程竞争带来的额外开销。该模式下,线程池内的所有工作线程会被划分为三种固定状态,分别是领导者状态、追随者状态、工作者状态,且同一时刻线程池中仅有一个线程处于领导者状态,其余线程要么处于追随者等待状态,要么处于工作者执行状态,状态划分清晰,调度规则统一规范。

具体运行流程遵循固定流转逻辑:当外部任务事件到达线程池时,当前处于领导者状态的线程,先完成事件分离与任务拆解,确认任务具备执行条件后,从所有追随者线程中按预设规则推选一个线程成为新的继任领导者,接替后续的事件监听与任务分配职责;完成继任推选后,原领导者线程主动切换为工作者状态,专注处理当前拆解后的具体任务,不再承担事件监听工作。待工作者线程完成全部任务执行逻辑后,自动退出工作者状态,重新回归追随者状态,进入等待队列,等待后续被推选为领导者或承接新任务。

该模式调度逻辑规整,线程状态切换有序,能够有效减少多线程间的锁竞争与频繁上下文切换,适配高并发、低延迟的事件驱动型场景。目前ACE框架中已提供该模式的成熟实现,可直接依托框架完成集成,无需从零搭建完整调度逻辑,适合对并发稳定性、响应速度要求较高的服务端任务处理场景。

2222

半同步半异步模式

也被称作生产者消费者模式,实现逻辑简洁易懂,是线程池最常用的实现形式,整体分为同步服务层、排队层、异步服务层三层结构,层级职责清晰,分工明确。同步服务层负责接收上层并发任务请求,将任务转入排队层等待,无需关注任务后续执行细节,主线程可持续接收新请求,不会出现阻塞;排队层作为中间缓冲模块,统一存放待执行任务,承接上层任务提交与下层任务调取的双向流程,保障任务有序调度;异步服务层包含多个预先创建的线程,循环从排队层调取任务并行处理,线程提前初始化完成,不会随任务量激增新建线程,大幅减少资源损耗。该模式可高效处理上层并发请求,但因线程间存在数据交互,不太适合大数据量直接交换的场景。

3333

线程池实现

半同步半异步模式的三层结构中,排队层是衔接上下层的核心模块,上层提交任务与下层调取任务的操作均围绕其展开,实现时需将排队层设计为线程安全的同步队列,保障多线程同时添加、取出任务的操作安全,避免数据竞争与异常。

线程池包含两个核心活动流程,一是向同步队列添加任务,二是从同步队列取出任务。线程池启动后,会先初始化指定数量的异步层线程,若队列无任务,此类线程进入等待休眠状态;队列新增任务后,会自动唤醒等待线程执行任务。同步服务层持续向队列提交任务,若任务量过大、异步层处理不及时,队列任务会持续堆积,无上限控制的队列极易引发内存暴涨问题,因此同步队列需设置任务数量上限,达到上限后暂停接收新任务,起到系统保护作用。

同步队列的实现依托C++11多项核心技术,通过互斥锁实现线程同步,通过条件变量实现线程间通信,搭配右值引用、std::move实现移动语义、std::forward实现完美转发。条件变量负责队列空满状态的线程等待与唤醒,队列空时消费者线程进入等待,队列满时生产者线程进入等待;同时增设停止接口,支持手动终止任务队列运行,保障线程安全退出。为优化执行效率,同步队列可调整任务调取逻辑,单次加锁即可取出队列全部任务,减少频繁加锁的开销,结合移动语义避免数据复制,进一步提升整体运行效率。

同步队列的核心操作包含三类:Take操作负责获取任务,通过互斥锁与条件变量判断队列状态,满足条件后取出任务并唤醒提交任务的等待线程;Add操作负责提交任务,加锁后判断队列容量,满足条件后存入任务并唤醒等待任务的工作线程;Stop操作负责终止队列运行,修改运行标识后唤醒所有等待线程,保障线程安全退出,无资源泄漏。

各类线程池详细实现

固定线程池

固定线程池创建时需指定线程数量,达到最大线程数后不再新增线程,任务提交后直接分配给空闲线程,多余任务进入队列等待。线程池主要包含线程组、同步队列、运行状态标识三类核心成员变量:线程组预置指定数量线程,建议数量匹配CPU核心数以提升运算效率,线程循环从同步队列取任务执行;同步队列管控任务上限,避免任务过度堆积;运行状态采用原子变量保障多线程操作安全,避免并发修改异常。

固定线程池的运行流程包含启动、任务执行、停止三个核心环节:启动函数初始化运行状态,创建指定数量线程加入线程组;工作线程循环检测运行状态,获取任务后有序执行;停止函数通过一次性调用标识保障停止逻辑仅执行一次,先终止同步队列,再等待所有线程执行完毕后清空线程组。针对任务提交,提供左值与右值两种重载接口,结合完美转发适配不同任务类型;同时设计拒绝策略,队列满时任务提交失败,可由提交任务的线程直接执行,避免任务丢失。为简化返回值获取流程,可通过模板、智能指针、std::packaged_taskstd::future优化提交接口,支持任意可调用对象与参数,便捷获取任务执行结果。

固定线程池的测试场景包含两类,一是借助std::promisestd::future实现带返回值的加法任务测试,二是动态内存分配与释放的任务测试,验证线程池对不同类型任务的处理能力。其适用场景涵盖并发限制场景、稳定任务执行场景、服务器请求处理场景、批量数据处理场景,使用时需结合系统负载与任务特性,合理配置线程数量与队列上限。

固定线程池依托线程预创建、同步队列线程安全管控、任务复用三大核心逻辑实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <functional>
#include <atomic>
#include <future>

// 全局通用线程安全同步队列,所有线程池共用基类,无重复定义
template <typename T>
class SyncQueue {
private:
std::queue<T> _task_queue;
std::mutex _mutex;
std::condition_variable _cv_not_empty;
std::condition_variable _cv_not_full;
size_t _max_capacity;
std::atomic<bool> _stop_flag{false};

public:
explicit SyncQueue(size_t max_cap = 1024) : _max_capacity(max_cap) {}

// 停止队列,唤醒所有等待线程,实现安全终止
void stop_queue() {
_stop_flag = true;
_cv_not_empty.notify_all();
_cv_not_full.notify_all();
}

// 判断队列是否停止,供子类调用
bool is_stop() const {
return _stop_flag;
}

// 任务入队,支持右值传递与完美转发
void push_task(T&& task) {
if (_stop_flag) return;
std::unique_lock<std::mutex> lock(_mutex);
// 队列满时等待,直至队列有空余或队列停止
_cv_not_full.wait(lock, [this]() {
return _stop_flag || _task_queue.size() < _max_capacity;
});
if (_stop_flag) return;
_task_queue.emplace(std::forward<T>(task));
_cv_not_empty.notify_one();
}

// 任务出队,返回是否成功获取任务
bool pop_task(T& out_task) {
if (_stop_flag && _task_queue.empty()) return false;
std::unique_lock<std::mutex> lock(_mutex);
// 队列空时等待,直至队列有任务或队列停止
_cv_not_empty.wait(lock, [this]() {
return _stop_flag || !_task_queue.empty();
});
if (_stop_flag || _task_queue.empty()) return false;
out_task = std::move(_task_queue.front());
_task_queue.pop();
_cv_not_full.notify_one();
return true;
}

bool is_empty() const { return _task_queue.empty(); }
size_t size() const { return _task_queue.size(); }
};

// 固定线程池完整实现
class FixedThreadPool {
private:
std::vector<std::thread> _worker_threads;
SyncQueue<std::function<void()>> _task_queue;
std::atomic<bool> _pool_running{true};
size_t _thread_count;

// 工作线程主逻辑,循环获取并执行任务
void worker_loop() {
while (_pool_running) {
std::function<void()> task;
if (_task_queue.pop_task(task)) {
if (task) task();
}
}
}

public:
// 构造函数,默认线程数为CPU核心数
explicit FixedThreadPool(size_t thread_num = std::thread::hardware_concurrency())
: _thread_count(thread_num), _task_queue(1024) {
_worker_threads.reserve(_thread_count);
for (size_t i = 0; i < _thread_count; ++i) {
_worker_threads.emplace_back(&FixedThreadPool::worker_loop, this);
}
}

// 提交任意可调用任务,支持带返回值
template <typename F, typename... Args>
auto submit_task(F&& func, Args&&... args) -> std::future<decltype(func(args...))> {
using ReturnType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(func), std::forward<Args>(args)...)
);
_task_queue.push_task([task]() { (*task)(); });
return task->get_future();
}

// 安全关闭线程池,等待所有任务执行完毕
void shutdown_pool() {
if (!_pool_running) return;
_pool_running = false;
_task_queue.stop_queue();
for (auto& t : _worker_threads) {
if (t.joinable()) t.join();
}
}

~FixedThreadPool() {
shutdown_pool();
}

// 禁用拷贝与移动,避免线程安全问题
FixedThreadPool(const FixedThreadPool&) = delete;
FixedThreadPool& operator=(const FixedThreadPool&) = delete;
FixedThreadPool(FixedThreadPool&&) = delete;
FixedThreadPool& operator=(FixedThreadPool&&) = delete;
};

// 固定线程池测试示例
void test_fixed_pool() {
FixedThreadPool pool(4);
// 提交普通无返回值任务
for (int i = 0; i < 8; ++i) {
pool.submit_task([i]() {
std::printf("固定线程池执行任务:%d | 线程ID:%zu\n", i, std::hash<std::thread::id>{}(std::this_thread::get_id()));
std::this_thread::sleep_for(std::chrono::milliseconds(200));
});
}
// 提交带返回值任务
auto res_future = pool.submit_task([]() -> int {
std::this_thread::sleep_for(std::chrono::milliseconds(300));
return 666;
});
std::cout << "任务返回值:" << res_future.get() << std::endl;
pool.shutdown_pool();
}

同步队列线程安全管控、线程预创建与复用、有界队列防内存溢出、任务完美转发、安全关闭机制,完全匹配固定线程池所有特性与适用场景。

缓存线程池

缓存线程池支持线程动态创建与空闲超时销毁,在固定线程池基础上扩展动态线程管理、空闲回收逻辑,适配短任务高并发、任务量波动大的场景,以下为完整可运行实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include <unordered_map>
// 直接继承全局通用SyncQueue,新增超时等待能力
template <typename T>
class TimedSyncQueue : public SyncQueue<T> {
public:
using SyncQueue<T>::SyncQueue;
// 超时出队,支持指定等待时间,超时后返回false
bool pop_task_timeout(T& out_task, int timeout_sec = 60) {
if (this->is_stop() && this->is_empty()) return false;
std::unique_lock<std::mutex> lock(this->_mutex);
// 带超时等待,超出指定时间未获取到任务则退出等待
if (!this->_cv_not_empty.wait_for(lock, std::chrono::seconds(timeout_sec), [this]() {
return this->is_stop() || !this->is_empty();
})) return false;
if (this->is_stop() || this->is_empty()) return false;
out_task = std::move(this->_task_queue.front());
this->_task_queue.pop();
this->_cv_not_full.notify_one();
return true;
}
};

// 缓存线程池完整实现
class CachedThreadPool {
private:
TimedSyncQueue<std::function<void()>> _task_queue;
std::unordered_map<std::thread::id, std::thread> _worker_map;
std::atomic<size_t> _core_threads{2}; // 基础常驻线程数
std::atomic<size_t> _max_threads{16}; // 最大线程数上限
std::atomic<size_t> _current_threads{0}; // 当前存活线程数
std::atomic<size_t> _idle_threads{0}; // 空闲线程数
std::atomic<bool> _pool_running{true};
const int _idle_timeout = 60; // 空闲超时时间(秒)

void worker_loop() {
_idle_threads++;
while (_pool_running) {
std::function<void()> task;
// 超时等待任务,超时后判断是否销毁线程
if (_task_queue.pop_task_timeout(task, _idle_timeout)) {
_idle_threads--;
if (task) task();
_idle_threads++;
} else {
// 超时且超过基础线程数,自动销毁当前线程
if (_current_threads > _core_threads && _pool_running) {
break;
}
}
}
_current_threads--;
_idle_threads--;
}

// 创建新工作线程
void create_worker() {
if (_current_threads >= _max_threads) return;
std::thread t(&CachedThreadPool::worker_loop, this);
std::thread::id tid = t.get_id();
_worker_map.emplace(tid, std::move(t));
_current_threads++;
}

public:
explicit CachedThreadPool(size_t core = 2, size_t max = 16)
: _core_threads(core), _max_threads(max) {
// 初始化基础常驻线程
for (size_t i = 0; i < _core_threads; ++i) create_worker();
}

template <typename F, typename... Args>
auto submit_task(F&& func, Args&&... args) {
using ReturnType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(func), std::forward<Args>(args)...)
);
// 无空闲线程且未达上限,立即创建新线程
if (_idle_threads == 0 && _current_threads < _max_threads) {
create_worker();
}
_task_queue.push_task([task]() { (*task)(); });
return task->get_future();
}

void shutdown_pool() {
_pool_running = false;
_task_queue.stop_queue();
for (auto& pair : _worker_map) {
if (pair.second.joinable()) pair.second.join();
}
_worker_map.clear();
}

~CachedThreadPool() {
shutdown_pool();
}
};

// 缓存线程池测试示例
void test_cached_pool() {
CachedThreadPool pool(2, 8);
for (int i = 0; i < 12; ++i) {
pool.submit_task([i]() {
std::printf("缓存线程池执行任务:%d | 线程ID:%zu\n", i, std::hash<std::thread::id>{}(std::this_thread::get_id()));
std::this_thread::sleep_for(std::chrono::milliseconds(150));
});
}
std::this_thread::sleep_for(std::chrono::seconds(2));
pool.shutdown_pool();
}

线程动态增减、空闲超时回收、线程数上下限管控、哈希表存储线程,完全匹配缓存线程池动态调优、高响应速度的核心特性。

工作窃取线程池

工作窃取线程池依托多独立队列与任务窃取机制实现多核负载均衡,减少线程竞争,适配多核CPU密集型、任务拆分型场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <algorithm>
// 工作窃取专属线程安全队列,独立实现不依赖通用SyncQueue
template <typename T>
class StealQueue {
private:
std::deque<T> _deque;
std::mutex _mutex;
public:
void push(T&& task) {
std::lock_guard<std::mutex> lock(_mutex);
_deque.emplace_back(std::forward<T>(task));
}
bool pop(T& out) {
std::lock_guard<std::mutex> lock(_mutex);
if (_deque.empty()) return false;
out = std::move(_deque.front());
_deque.pop_front();
return true;
}
bool steal(T& out) {
std::lock_guard<std::mutex> lock(_mutex);
if (_deque.size() <= 1) return false;
out = std::move(_deque.back());
_deque.pop_back();
return true;
}
bool empty() { std::lock_guard<std::mutex> lock(_mutex); return _deque.empty(); }
};

// 工作窃取线程池完整实现
class StealThreadPool {
private:
std::vector<std::thread> _workers;
std::vector<std::unique_ptr<StealQueue<std::function<void()>>>> _queues;
std::atomic<bool> _running{true};
size_t _thread_num;

void worker_loop(size_t index) {
while (_running) {
std::function<void()> task;
// 优先执行自身队列任务
if (_queues[index]->pop(task)) { task(); continue; }
// 自身队列为空,窃取其他队列任务
for (size_t i = 0; i < _thread_num; ++i) {
if (i == index) continue;
if (_queues[i]->steal(task)) { task(); break; }
}
std::this_thread::yield();
}
}

public:
explicit StealThreadPool(size_t num = std::thread::hardware_concurrency())
: _thread_num(num) {
_queues.reserve(_thread_num);
for (size_t i = 0; i < _thread_num; ++i) {
_queues.emplace_back(std::make_unique<StealQueue<std::function<void()>>>());
}
for (size_t i = 0; i < _thread_num; ++i) {
_workers.emplace_back(&StealThreadPool::worker_loop, this, i);
}
}

template <typename F, typename... Args>
void submit_task(F&& func, Args&&... args) {
auto task = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
// 轮询提交到不同队列,均衡任务分布
static std::atomic<size_t> idx{0};
size_t cur = idx++ % _thread_num;
_queues[cur]->push(std::move(task));
}

void shutdown_pool() {
_running = false;
for (auto& t : _workers) if (t.joinable()) t.join();
}

~StealThreadPool() { shutdown_pool(); }
};

多独立任务队列、任务窃取逻辑、多核负载均衡、CPU利用率优化,完全匹配工作窃取线程池的核心运行机制。

此类线程池适合任务分解型应用、递归型任务、高吞吐量任务、CPU密集型任务,能够充分发挥多核处理器优势;若任务量较少或存在大量IO阻塞,运行效果不如其他类型线程池,需结合任务特性合理选型。

工作窃取线程池

计划线程池

计划线程池主打定时与周期性任务调度,依托优先队列与时间戳调度实现精准定时,适配定时运维、延时执行类场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <chrono>
#include <queue>
// 定时任务结构体
struct TimerTask {
std::function<void()> task;
std::chrono::steady_clock::time_point execute_time;
bool operator<(const TimerTask& other) const {
return execute_time > other.execute_time;
}
};

// 计划线程池完整实现
class ScheduledThreadPool {
private:
std::thread _worker;
std::priority_queue<TimerTask> _task_queue;
std::mutex _mutex;
std::condition_variable _cv;
std::atomic<bool> _running{true};

void worker_loop() {
while (_running) {
std::unique_lock<std::mutex> lock(_mutex);
if (_task_queue.empty()) {
_cv.wait(lock);
continue;
}
auto now = std::chrono::steady_clock::now();
auto& top_task = _task_queue.top();
if (top_task.execute_time <= now) {
auto task = std::move(top_task.task);
_task_queue.pop();
lock.unlock();
task();
lock.lock();
} else {
_cv.wait_until(lock, top_task.execute_time);
}
}
}

public:
ScheduledThreadPool() { _worker = std::thread(&ScheduledThreadPool::worker_loop, this); }

// 延迟执行任务,参数为延迟毫秒数
void schedule_delay(std::function<void()> task, int delay_ms) {
std::lock_guard<std::mutex> lock(_mutex);
_task_queue.push({std::move(task), std::chrono::steady_clock::now() + std::chrono::milliseconds(delay_ms)});
_cv.notify_one();
}

void shutdown_pool() {
_running = false;
_cv.notify_one();
if (_worker.joinable()) _worker.join();
}

~ScheduledThreadPool() { shutdown_pool(); }
};

// 计划线程池测试示例
void test_scheduled_pool() {
ScheduledThreadPool pool;
std::cout << "提交延迟任务,等待500ms执行" << std::endl;
pool.schedule_delay([]() {
std::printf("定时任务执行 | 线程ID:%zu\n", std::hash<std::thread::id>{}(std::this_thread::get_id()));
}, 500);
std::this_thread::sleep_for(std::chrono::seconds(1));
pool.shutdown_pool();
}

优先队列时间排序、定时等待执行、延迟任务调度,完全匹配计划线程池定时、周期性任务执行的核心特性。

任务适配与线程数配置

线程池处理的任务主要分为CPU密集型与IO密集型两类,两类任务的运行特性差异,直接决定了线程数量的配置逻辑。CPU密集型任务以持续运算为主,比如加解密、压缩解压、排序搜索、数值计算等,线程过多会引发频繁上下文切换,反而降低运算效率,线程数建议设置为CPU核心数加一;IO密集型任务包含大量数据交互、文件读写、网络通信、数据库访问等操作,线程多数时间处于IO等待状态,可适当增加线程数提升并发度,常规配置为CPU核心数的两倍。

提升硬件利用率方面,单核系统可通过多线程重叠IO操作与CPU计算,避免CPU空闲等待;多核系统可结合任务类型,合理分配线程数量,让IO等待与CPU运算并行执行,最大化发挥硬件性能。实际项目使用中,需结合任务耗时比例、系统负载、服务器配置动态调整线程参数,避免线程数量过多或过少导致的性能损耗。

线程池详解

线程池基础概述

在处理大量并发任务时,传统逐任务创建线程的方式会产生大量线程创建与销毁的系统资源开销,同时增加线程上下文切换的额外损耗,线程池技术正是为缓解这类问题而生。线程池的基础运行逻辑是在系统内部预先创建一定数量的线程,当任务请求传入时,直接从池中调配已就绪线程处理任务;线程完成当前任务后不会被销毁,而是重新回归线程池待命,持续承接后续任务,实现线程的循环复用。

暂时无法在豆包文档外展示此内容

线程池具备多项实用特性,可适配多场景并发任务处理需求:其一为线程复用,通过维护固定数量的就绪线程重复执行任务,规避频繁创建销毁线程带来的资源损耗;其二是并发规模管控,针对多核处理器环境,可限定同时运行的线程数量,避免资源过度占用与线程竞争导致的运行效率下降;其三为任务排队机制,线程全量占用时,新任务会存入任务队列等待,队列可选择有界或无界类型,适配不同调度需求;其四是提升任务响应速度,任务到达后无需等待线程创建,可直接分配执行;其五支持线程状态管理,可设置线程属性、追踪线程运行状态、统计任务执行情况;其六实现系统资源管控,结合线程数量限制与队列机制,负载过高时可按配置调整线程数量,维持系统稳定运行;其七具备任务执行灵活性,可承接不同类型任务,支持普通提交、定时执行、周期性执行等多种调度方式。

借助C++11实现线程池,需要掌握多项核心相关知识:涵盖多线程基础理论,包括线程基础、线程同步、线程互斥、原子操作等内容;熟悉C++11标准并发库,涉及thread、mutex、condition_variable、atomic、unique_lock等核心组件;掌握C++11完美转发、lambda表达式的用法;熟练运用C++11智能指针;了解C++ STL容器库的基础操作。线程池适配主流两类开发环境,Windows平台可使用VS2019,Linux平台需搭配支持C++11及以上版本的g++编译器。

线程池常见分类

线程池可按照应用场景划分为五种常见类型,各类线程池的运行逻辑、调度规则与适配场景各有差异,实际使用中可根据任务特性选择对应类型,最大化发挥并发效率。

固定线程池

此类线程池的线程数量保持固定,线程创建后长期存活,不会随任务数量增减动态调整,超出线程承载能力的任务会存入队列等待。它适合任务量相对固定、单任务耗时较长的场景,能够稳定管控并发线程规模,避免系统资源波动,保障运行稳定性。

缓存线程池

线程数量支持动态灵活调整,新任务到达时,优先复用空闲线程;若无空闲线程,则新建线程加入池中;线程闲置超过默认60秒后,会被自动移除并销毁,释放闲置资源。它适合任务提交量大、单任务耗时较短的场景,可快速响应突发任务,闲置线程回收能减少长期无效资源占用,提升资源利用率。

单线程池

仅启用一个工作线程处理所有任务,可严格保障任务按照指定顺序执行,常见执行顺序包括先进先出、后进先出、优先级排序等。它适合需要严格保证任务执行次序的场景,彻底避免多线程并发导致的执行顺序混乱,确保任务执行逻辑可控。

工作窃取线程池

内置多个独立任务队列,可创建足量线程匹配系统并行级别,通过工作窃取机制提升多核CPU利用率,避免处理器闲置。当前线程完成自身队列任务后,可从全局队列或其他线程的任务队列中调取任务执行,实现多线程负载均衡,充分发挥多核处理器的并行运算能力。

计划线程池

也称作定时线程池或调度线程池,核心支持定时执行与周期性执行任务,可灵活设定任务执行的时间间隔或具体执行时间点。它适合需要定期执行的任务场景,比如定时数据备份、日志清理、定时邮件发送、系统心跳检测等常规运维类任务。

线程池主流运行模式

线程池的运行主要分为两种经典模式,二者实现逻辑、调度方式与适用场景差异明显,可根据项目并发需求与业务特性选择适配模式。

领导者跟随者模式

领导者跟随者模式是线程池的经典运行模式之一,核心围绕线程状态划分与有序调度展开,全程维持线程状态的稳定切换,避免多线程同时抢占事件资源,有效减少线程竞争带来的额外开销。该模式下,线程池内的所有工作线程会被划分为三种固定状态,分别是领导者状态、追随者状态、工作者状态,且同一时刻线程池中仅有一个线程处于领导者状态,其余线程要么处于追随者等待状态,要么处于工作者执行状态,状态划分清晰,调度规则统一规范。

具体运行流程遵循固定流转逻辑:当外部任务事件到达线程池时,当前处于领导者状态的线程,先完成事件分离与任务拆解,确认任务具备执行条件后,从所有追随者线程中按预设规则推选一个线程成为新的继任领导者,接替后续的事件监听与任务分配职责;完成继任推选后,原领导者线程主动切换为工作者状态,专注处理当前拆解后的具体任务,不再承担事件监听工作。待工作者线程完成全部任务执行逻辑后,自动退出工作者状态,重新回归追随者状态,进入等待队列,等待后续被推选为领导者或承接新任务。

该模式调度逻辑规整,线程状态切换有序,能够有效减少多线程间的锁竞争与频繁上下文切换,适配高并发、低延迟的事件驱动型场景。目前ACE框架中已提供该模式的成熟实现,可直接依托框架完成集成,无需从零搭建完整调度逻辑,适合对并发稳定性、响应速度要求较高的服务端任务处理场景。

暂时无法在豆包文档外展示此内容

半同步半异步模式

也被称作生产者消费者模式,实现逻辑简洁易懂,是线程池最常用的实现形式,整体分为同步服务层、排队层、异步服务层三层结构,层级职责清晰,分工明确。同步服务层负责接收上层并发任务请求,将任务转入排队层等待,无需关注任务后续执行细节,主线程可持续接收新请求,不会出现阻塞;排队层作为中间缓冲模块,统一存放待执行任务,承接上层任务提交与下层任务调取的双向流程,保障任务有序调度;异步服务层包含多个预先创建的线程,循环从排队层调取任务并行处理,线程提前初始化完成,不会随任务量激增新建线程,大幅减少资源损耗。该模式可高效处理上层并发请求,但因线程间存在数据交互,不太适合大数据量直接交换的场景。

暂时无法在豆包文档外展示此内容

线程池实现关键逻辑

半同步半异步模式的三层结构中,排队层是衔接上下层的核心模块,上层提交任务与下层调取任务的操作均围绕其展开,实现时需将排队层设计为线程安全的同步队列,保障多线程同时添加、取出任务的操作安全,避免数据竞争与异常。

线程池包含两个核心活动流程,一是向同步队列添加任务,二是从同步队列取出任务。线程池启动后,会先初始化指定数量的异步层线程,若队列无任务,此类线程进入等待休眠状态;队列新增任务后,会自动唤醒等待线程执行任务。同步服务层持续向队列提交任务,若任务量过大、异步层处理不及时,队列任务会持续堆积,无上限控制的队列极易引发内存暴涨问题,因此同步队列需设置任务数量上限,达到上限后暂停接收新任务,起到系统保护作用。

暂时无法在豆包文档外展示此内容

同步队列的实现依托C++11多项核心技术,通过互斥锁实现线程同步,通过条件变量实现线程间通信,搭配右值引用、std::move实现移动语义、std::forward实现完美转发。条件变量负责队列空满状态的线程等待与唤醒,队列空时消费者线程进入等待,队列满时生产者线程进入等待;同时增设停止接口,支持手动终止任务队列运行,保障线程安全退出。为优化执行效率,同步队列可调整任务调取逻辑,单次加锁即可取出队列全部任务,减少频繁加锁的开销,结合移动语义避免数据复制,进一步提升整体运行效率。

同步队列的核心操作包含三类:Take操作负责获取任务,通过互斥锁与条件变量判断队列状态,满足条件后取出任务并唤醒提交任务的等待线程;Add操作负责提交任务,加锁后判断队列容量,满足条件后存入任务并唤醒等待任务的工作线程;Stop操作负责终止队列运行,修改运行标识后唤醒所有等待线程,保障线程安全退出,无资源泄漏。

各类线程池详细实现

固定线程池

固定线程池创建时需指定线程数量,达到最大线程数后不再新增线程,任务提交后直接分配给空闲线程,多余任务进入队列等待。线程池主要包含线程组、同步队列、运行状态标识三类核心成员变量:线程组预置指定数量线程,建议数量匹配CPU核心数以提升运算效率,线程循环从同步队列取任务执行;同步队列管控任务上限,避免任务过度堆积;运行状态采用原子变量保障多线程操作安全,避免并发修改异常。

固定线程池的运行流程包含启动、任务执行、停止三个核心环节:启动函数初始化运行状态,创建指定数量线程加入线程组;工作线程循环检测运行状态,获取任务后有序执行;停止函数通过一次性调用标识保障停止逻辑仅执行一次,先终止同步队列,再等待所有线程执行完毕后清空线程组。针对任务提交,提供左值与右值两种重载接口,结合完美转发适配不同任务类型;同时设计拒绝策略,队列满时任务提交失败,可由提交任务的线程直接执行,避免任务丢失。为简化返回值获取流程,可通过模板、智能指针、std::packaged_task与std::future优化提交接口,支持任意可调用对象与参数,便捷获取任务执行结果。

固定线程池的测试场景包含两类,一是借助std::promise与std::future实现带返回值的加法任务测试,二是动态内存分配与释放的任务测试,验证线程池对不同类型任务的处理能力。其适用场景涵盖并发限制场景、稳定任务执行场景、服务器请求处理场景、批量数据处理场景,使用时需结合系统负载与任务特性,合理配置线程数量与队列上限。

固定线程池依托线程预创建、同步队列线程安全管控、任务复用三大核心逻辑实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <functional>
#include <atomic>
#include <future>

// 全局通用线程安全同步队列,所有线程池共用基类,无重复定义
template <typename T>
class SyncQueue {
private:
std::queue<T> _task_queue;
std::mutex _mutex;
std::condition_variable _cv_not_empty;
std::condition_variable _cv_not_full;
size_t _max_capacity;
std::atomic<bool> _stop_flag{false};

public:
explicit SyncQueue(size_t max_cap = 1024) : _max_capacity(max_cap) {}

// 停止队列,唤醒所有等待线程,实现安全终止
void stop_queue() {
_stop_flag = true;
_cv_not_empty.notify_all();
_cv_not_full.notify_all();
}

// 判断队列是否停止,供子类调用
bool is_stop() const {
return _stop_flag;
}

// 任务入队,支持右值传递与完美转发
void push_task(T&& task) {
if (_stop_flag) return;
std::unique_lock<std::mutex> lock(_mutex);
// 队列满时等待,直至队列有空余或队列停止
_cv_not_full.wait(lock, [this]() {
return _stop_flag || _task_queue.size() < _max_capacity;
});
if (_stop_flag) return;
_task_queue.emplace(std::forward<T>(task));
_cv_not_empty.notify_one();
}

// 任务出队,返回是否成功获取任务
bool pop_task(T& out_task) {
if (_stop_flag && _task_queue.empty()) return false;
std::unique_lock<std::mutex> lock(_mutex);
// 队列空时等待,直至队列有任务或队列停止
_cv_not_empty.wait(lock, [this]() {
return _stop_flag || !_task_queue.empty();
});
if (_stop_flag || _task_queue.empty()) return false;
out_task = std::move(_task_queue.front());
_task_queue.pop();
_cv_not_full.notify_one();
return true;
}

bool is_empty() const { return _task_queue.empty(); }
size_t size() const { return _task_queue.size(); }
};

// 固定线程池完整实现
class FixedThreadPool {
private:
std::vector<std::thread> _worker_threads;
SyncQueue<std::function<void()>> _task_queue;
std::atomic<bool> _pool_running{true};
size_t _thread_count;

// 工作线程主逻辑,循环获取并执行任务
void worker_loop() {
while (_pool_running) {
std::function<void()> task;
if (_task_queue.pop_task(task)) {
if (task) task();
}
}
}

public:
// 构造函数,默认线程数为CPU核心数
explicit FixedThreadPool(size_t thread_num = std::thread::hardware_concurrency())
: _thread_count(thread_num), _task_queue(1024) {
_worker_threads.reserve(_thread_count);
for (size_t i = 0; i < _thread_count; ++i) {
_worker_threads.emplace_back(&FixedThreadPool::worker_loop, this);
}
}

// 提交任意可调用任务,支持带返回值
template <typename F, typename... Args>
auto submit_task(F&& func, Args&&... args) -> std::future<decltype(func(args...))> {
using ReturnType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(func), std::forward<Args>(args)...)
);
_task_queue.push_task([task]() { (*task)(); });
return task->get_future();
}

// 安全关闭线程池,等待所有任务执行完毕
void shutdown_pool() {
if (!_pool_running) return;
_pool_running = false;
_task_queue.stop_queue();
for (auto& t : _worker_threads) {
if (t.joinable()) t.join();
}
}

~FixedThreadPool() {
shutdown_pool();
}

// 禁用拷贝与移动,避免线程安全问题
FixedThreadPool(const FixedThreadPool&) = delete;
FixedThreadPool& operator=(const FixedThreadPool&) = delete;
FixedThreadPool(FixedThreadPool&&) = delete;
FixedThreadPool& operator=(FixedThreadPool&&) = delete;
};

// 固定线程池测试示例
void test_fixed_pool() {
FixedThreadPool pool(4);
// 提交普通无返回值任务
for (int i = 0; i < 8; ++i) {
pool.submit_task([i]() {
std::printf("固定线程池执行任务:%d | 线程ID:%zu\n", i, std::hash<std::thread::id>{}(std::this_thread::get_id()));
std::this_thread::sleep_for(std::chrono::milliseconds(200));
});
}
// 提交带返回值任务
auto res_future = pool.submit_task([]() -> int {
std::this_thread::sleep_for(std::chrono::milliseconds(300));
return 666;
});
std::cout << "任务返回值:" << res_future.get() << std::endl;
pool.shutdown_pool();
}

同步队列线程安全管控、线程预创建与复用、有界队列防内存溢出、任务完美转发、安全关闭机制,完全匹配固定线程池所有特性与适用场景。

缓存线程池

缓存线程池支持线程动态创建与空闲超时销毁,在固定线程池基础上扩展动态线程管理、空闲回收逻辑,适配短任务高并发、任务量波动大的场景,以下为完整可运行实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include <unordered_map>
// 直接继承全局通用SyncQueue,新增超时等待能力
template <typename T>
class TimedSyncQueue : public SyncQueue<T> {
public:
using SyncQueue<T>::SyncQueue;
// 超时出队,支持指定等待时间,超时后返回false
bool pop_task_timeout(T& out_task, int timeout_sec = 60) {
if (this->is_stop() && this->is_empty()) return false;
std::unique_lock<std::mutex> lock(this->_mutex);
// 带超时等待,超出指定时间未获取到任务则退出等待
if (!this->_cv_not_empty.wait_for(lock, std::chrono::seconds(timeout_sec), [this]() {
return this->is_stop() || !this->is_empty();
})) return false;
if (this->is_stop() || this->is_empty()) return false;
out_task = std::move(this->_task_queue.front());
this->_task_queue.pop();
this->_cv_not_full.notify_one();
return true;
}
};

// 缓存线程池完整实现
class CachedThreadPool {
private:
TimedSyncQueue<std::function<void()>> _task_queue;
std::unordered_map<std::thread::id, std::thread> _worker_map;
std::atomic<size_t> _core_threads{2}; // 基础常驻线程数
std::atomic<size_t> _max_threads{16}; // 最大线程数上限
std::atomic<size_t> _current_threads{0}; // 当前存活线程数
std::atomic<size_t> _idle_threads{0}; // 空闲线程数
std::atomic<bool> _pool_running{true};
const int _idle_timeout = 60; // 空闲超时时间(秒)

void worker_loop() {
_idle_threads++;
while (_pool_running) {
std::function<void()> task;
// 超时等待任务,超时后判断是否销毁线程
if (_task_queue.pop_task_timeout(task, _idle_timeout)) {
_idle_threads--;
if (task) task();
_idle_threads++;
} else {
// 超时且超过基础线程数,自动销毁当前线程
if (_current_threads > _core_threads && _pool_running) {
break;
}
}
}
_current_threads--;
_idle_threads--;
}

// 创建新工作线程
void create_worker() {
if (_current_threads >= _max_threads) return;
std::thread t(&CachedThreadPool::worker_loop, this);
std::thread::id tid = t.get_id();
_worker_map.emplace(tid, std::move(t));
_current_threads++;
}

public:
explicit CachedThreadPool(size_t core = 2, size_t max = 16)
: _core_threads(core), _max_threads(max) {
// 初始化基础常驻线程
for (size_t i = 0; i < _core_threads; ++i) create_worker();
}

template <typename F, typename... Args>
auto submit_task(F&& func, Args&&... args) {
using ReturnType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(func), std::forward<Args>(args)...)
);
// 无空闲线程且未达上限,立即创建新线程
if (_idle_threads == 0 && _current_threads < _max_threads) {
create_worker();
}
_task_queue.push_task([task]() { (*task)(); });
return task->get_future();
}

void shutdown_pool() {
_pool_running = false;
_task_queue.stop_queue();
for (auto& pair : _worker_map) {
if (pair.second.joinable()) pair.second.join();
}
_worker_map.clear();
}

~CachedThreadPool() {
shutdown_pool();
}
};

// 缓存线程池测试示例
void test_cached_pool() {
CachedThreadPool pool(2, 8);
for (int i = 0; i < 12; ++i) {
pool.submit_task([i]() {
std::printf("缓存线程池执行任务:%d | 线程ID:%zu\n", i, std::hash<std::thread::id>{}(std::this_thread::get_id()));
std::this_thread::sleep_for(std::chrono::milliseconds(150));
});
}
std::this_thread::sleep_for(std::chrono::seconds(2));
pool.shutdown_pool();
}

线程动态增减、空闲超时回收、线程数上下限管控、哈希表存储线程,完全匹配缓存线程池动态调优、高响应速度的核心特性。

工作窃取线程池

工作窃取线程池依托多独立队列与任务窃取机制实现多核负载均衡,减少线程竞争,适配多核CPU密集型、任务拆分型场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <algorithm>
// 工作窃取专属线程安全队列,独立实现不依赖通用SyncQueue
template <typename T>
class StealQueue {
private:
std::deque<T> _deque;
std::mutex _mutex;
public:
void push(T&& task) {
std::lock_guard<std::mutex> lock(_mutex);
_deque.emplace_back(std::forward<T>(task));
}
bool pop(T& out) {
std::lock_guard<std::mutex> lock(_mutex);
if (_deque.empty()) return false;
out = std::move(_deque.front());
_deque.pop_front();
return true;
}
bool steal(T& out) {
std::lock_guard<std::mutex> lock(_mutex);
if (_deque.size() <= 1) return false;
out = std::move(_deque.back());
_deque.pop_back();
return true;
}
bool empty() { std::lock_guard<std::mutex> lock(_mutex); return _deque.empty(); }
};

// 工作窃取线程池完整实现
class StealThreadPool {
private:
std::vector<std::thread> _workers;
std::vector<std::unique_ptr<StealQueue<std::function<void()>>>> _queues;
std::atomic<bool> _running{true};
size_t _thread_num;

void worker_loop(size_t index) {
while (_running) {
std::function<void()> task;
// 优先执行自身队列任务
if (_queues[index]->pop(task)) { task(); continue; }
// 自身队列为空,窃取其他队列任务
for (size_t i = 0; i < _thread_num; ++i) {
if (i == index) continue;
if (_queues[i]->steal(task)) { task(); break; }
}
std::this_thread::yield();
}
}

public:
explicit StealThreadPool(size_t num = std::thread::hardware_concurrency())
: _thread_num(num) {
_queues.reserve(_thread_num);
for (size_t i = 0; i < _thread_num; ++i) {
_queues.emplace_back(std::make_unique<StealQueue<std::function<void()>>>());
}
for (size_t i = 0; i < _thread_num; ++i) {
_workers.emplace_back(&StealThreadPool::worker_loop, this, i);
}
}

template <typename F, typename... Args>
void submit_task(F&& func, Args&&... args) {
auto task = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
// 轮询提交到不同队列,均衡任务分布
static std::atomic<size_t> idx{0};
size_t cur = idx++ % _thread_num;
_queues[cur]->push(std::move(task));
}

void shutdown_pool() {
_running = false;
for (auto& t : _workers) if (t.joinable()) t.join();
}

~StealThreadPool() { shutdown_pool(); }
};

多独立任务队列、任务窃取逻辑、多核负载均衡、CPU利用率优化,完全匹配工作窃取线程池的核心运行机制。

此类线程池适合任务分解型应用、递归型任务、高吞吐量任务、CPU密集型任务,能够充分发挥多核处理器优势;若任务量较少或存在大量IO阻塞,运行效果不如其他类型线程池,需结合任务特性合理选型。

暂时无法在豆包文档外展示此内容

计划线程池

计划线程池主打定时与周期性任务调度,依托优先队列与时间戳调度实现精准定时,适配定时运维、延时执行类场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <chrono>
#include <queue>
// 定时任务结构体,计划线程池独立队列实现
struct TimerTask {
std::function<void()> task;
std::chrono::steady_clock::time_point execute_time;
bool operator<(const TimerTask& other) const {
return execute_time > other.execute_time;
}
};

// 计划线程池完整实现
class ScheduledThreadPool {
private:
std::thread _worker;
std::priority_queue<TimerTask> _task_queue;
std::mutex _mutex;
std::condition_variable _cv;
std::atomic<bool> _running{true};

void worker_loop() {
while (_running) {
std::unique_lock<std::mutex> lock(_mutex);
if (_task_queue.empty()) {
_cv.wait(lock);
continue;
}
auto now = std::chrono::steady_clock::now();
auto& top_task = _task_queue.top();
if (top_task.execute_time <= now) {
auto task = std::move(top_task.task);
_task_queue.pop();
lock.unlock();
task();
lock.lock();
} else {
_cv.wait_until(lock, top_task.execute_time);
}
}
}

public:
ScheduledThreadPool() { _worker = std::thread(&ScheduledThreadPool::worker_loop, this); }

// 延迟执行任务,参数为延迟毫秒数
void schedule_delay(std::function<void()> task, int delay_ms) {
std::lock_guard<std::mutex> lock(_mutex);
_task_queue.push({std::move(task), std::chrono::steady_clock::now() + std::chrono::milliseconds(delay_ms)});
_cv.notify_one();
}

void shutdown_pool() {
_running = false;
_cv.notify_one();
if (_worker.joinable()) _worker.join();
}

~ScheduledThreadPool() { shutdown_pool(); }
};

// 计划线程池测试示例
void test_scheduled_pool() {
ScheduledThreadPool pool;
std::cout << "提交延迟任务,等待500ms执行" << std::endl;
pool.schedule_delay([]() {
std::printf("定时任务执行 | 线程ID:%zu\n", std::hash<std::thread::id>{}(std::this_thread::get_id()));
}, 500);
std::this_thread::sleep_for(std::chrono::seconds(1));
pool.shutdown_pool();
}

优先队列时间排序、定时等待执行、延迟任务调度,完全匹配计划线程池定时、周期性任务执行的核心特性。

任务适配与线程数配置

线程池处理的任务主要分为CPU密集型与IO密集型两类,两类任务的运行特性差异,直接决定了线程数量的配置逻辑。CPU密集型任务以持续运算为主,比如加解密、压缩解压、排序搜索、数值计算等,线程过多会引发频繁上下文切换,反而降低运算效率,线程数建议设置为CPU核心数加一;IO密集型任务包含大量数据交互、文件读写、网络通信、数据库访问等操作,线程多数时间处于IO等待状态,可适当增加线程数提升并发度,常规配置为CPU核心数的两倍。

提升硬件利用率方面,单核系统可通过多线程重叠IO操作与CPU计算,避免CPU空闲等待;多核系统可结合任务类型,合理分配线程数量,让IO等待与CPU运算并行执行,最大化发挥硬件性能。实际项目使用中,需结合任务耗时比例、系统负载、服务器配置动态调整线程参数,避免线程数量过多或过少导致的性能损耗。

无锁队列优化

同步锁队列的性能痛点

前文实现的SyncQueue同步队列依赖互斥锁(mutex)+条件变量实现线程安全,在高并发、短任务场景下会出现明显性能损耗:核心问题在于锁竞争与线程阻塞,多线程同时争抢锁资源时,会出现线程挂起、唤醒、上下文切换的额外开销,高并发下锁等待占比升高,直接拉低线程池整体吞吐效率;此外,锁的临界区会强制线程串行执行入队、出队操作,无法充分发挥多核CPU的并行优势,这也是高并发场景下同步锁队列成为性能瓶颈的核心原因。

无锁队列原理

无锁队列完全摒弃互斥锁,依托C++11标准库的**原子操作(atomic)CAS(Compare And Swap,比较并交换)**原语实现线程安全,是高并发场景下队列优化的核心方案。CAS操作是原子性的指令级操作,流程为:先读取目标内存值,对比预期值,若一致则修改为新值,若不一致则说明其他线程已修改,当前线程重试即可,全程无锁、无线程阻塞切换,大幅降低并发开销。

本次实现的无锁队列采用带头结点的单链表结构,适配多生产者多消费者场景(MPMC),专门针对线程池任务调度优化,具备无锁、高吞吐、低延迟的特点,完美替代原有同步锁队列,解决锁竞争问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include <iostream>
#include <atomic>
#include <memory>
#include <functional>

// 无锁队列节点定义
template <typename T>
struct LockFreeNode {
T data;
std::atomic<LockFreeNode*> next;

// 空节点构造,用于头结点
LockFreeNode() : next(nullptr) {}
// 带数据节点构造,用于存储任务
LockFreeNode(T&& val) : data(std::forward<T>(val)), next(nullptr) {}
};

// 多生产者多消费者无锁队列,适配线程池任务调度
template <typename T>
class LockFreeQueue {
private:
std::atomic<LockFreeNode<T>*> _head; // 队列头结点
std::atomic<LockFreeNode<T>*> _tail; // 队列尾结点

public:
LockFreeQueue() {
// 初始化空头结点,简化边界判断
LockFreeNode<T>* dummy = new LockFreeNode<T>();
_head.store(dummy, std::memory_order_relaxed);
_tail.store(dummy, std::memory_order_relaxed);
}

// 禁用拷贝与移动,避免线程安全问题
LockFreeQueue(const LockFreeQueue&) = delete;
LockFreeQueue& operator=(const LockFreeQueue&) = delete;
LockFreeQueue(LockFreeQueue&&) = delete;
LockFreeQueue& operator=(LockFreeQueue&&) = delete;

~LockFreeQueue() {
// 析构时释放所有节点
LockFreeNode<T>* cur = _head.load(std::memory_order_relaxed);
while (cur != nullptr) {
LockFreeNode<T>* next = cur->next.load(std::memory_order_relaxed);
delete cur;
cur = next;
}
}

// 无锁入队,多生产者安全
void enqueue(T&& task) {
LockFreeNode<T>* new_node = new LockFreeNode<T>(std::forward<T>(task));
LockFreeNode<T>* old_tail = nullptr;

while (true) {
// 读取当前尾结点
old_tail = _tail.load(std::memory_order_acquire);
LockFreeNode<T>* next = old_tail->next.load(std::memory_order_acquire);

// 确保尾结点未被其他线程修改
if (old_tail == _tail.load(std::memory_order_acquire)) {
// 尾结点next为空,说明可插入
if (next == nullptr) {
// CAS尝试修改尾结点next指针
if (old_tail->next.compare_exchange_weak(next, new_node,
std::memory_order_release, std::memory_order_relaxed)) {
// CAS成功,更新尾结点
_tail.compare_exchange_strong(old_tail, new_node,
std::memory_order_release, std::memory_order_relaxed);
break;
}
}
// 尾结点已滞后,帮助更新尾结点
else {
_tail.compare_exchange_strong(old_tail, next,
std::memory_order_release, std::memory_order_relaxed);
}
}
}
}

// 无锁出队,多消费者安全,返回是否出队成功
bool dequeue(T& out_task) {
LockFreeNode<T>* old_head = nullptr;

while (true) {
old_head = _head.load(std::memory_order_acquire);
LockFreeNode<T>* old_tail = _tail.load(std::memory_order_acquire);
LockFreeNode<T>* next = old_head->next.load(std::memory_order_acquire);

// 队列空,无任务可出队
if (old_head == old_tail && next == nullptr) {
return false;
}

// 头结点未被修改,执行出队
if (old_head == _head.load(std::memory_order_acquire)) {
// CAS尝试更新头结点
if (_head.compare_exchange_weak(old_head, next,
std::memory_order_release, std::memory_order_relaxed)) {
// 取出任务数据
out_task = std::move(next->data);
// 释放原头结点内存
delete old_head;
return true;
}
}
}
}

// 判断队列是否为空
bool is_empty() {
LockFreeNode<T>* head = _head.load(std::memory_order_relaxed);
LockFreeNode<T>* tail = _tail.load(std::memory_order_relaxed);
LockFreeNode<T>* next = head->next.load(std::memory_order_relaxed);
return (head == tail && next == nullptr);
}
};

在线程池中的应用

无锁队列可直接替换原有线程池中的SyncQueue同步锁队列,无需大幅改动线程池核心逻辑,仅需替换成员变量类型,适配高并发短任务场景,性能提升显著,具体应用方式如下:

无锁队列改造线程池

无锁队列可直接替换原有同步锁队列,无需改动线程池核心调度逻辑,仅需修改队列类型与入队出队接口,以下以固定线程池无锁改造为例,给出完整可运行代码,清晰标注核心修改点位,适配高并发短任务场景:

改造要点

  1. 移除原有SyncQueue同步锁队列,替换为自定义LockFreeQueue无锁队列
  2. 入队方法从 push_task 改为无锁队列 enqueue
  3. 出队方法从 pop_task 改为无锁队列 dequeue
  4. 删除原有锁相关、条件变量相关依赖,保留线程复用、任务提交、优雅关闭逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#include <iostream>
#include <vector>
#include <thread>
#include <functional>
#include <atomic>
#include <future>
#include <atomic>

// 直接复用前文实现的无锁队列,无需重复定义
template <typename T>
struct LockFreeNode {
T data;
std::atomic<LockFreeNode*> next;
LockFreeNode() : next(nullptr) {}
LockFreeNode(T&& val) : data(std::forward<T>(val)), next(nullptr) {}
};

template <typename T>
class LockFreeQueue {
private:
std::atomic<LockFreeNode<T>*> _head;
std::atomic<LockFreeNode<T>*> _tail;
public:
LockFreeQueue() {
LockFreeNode<T>* dummy = new LockFreeNode<T>();
_head.store(dummy, std::memory_order_relaxed);
_tail.store(dummy, std::memory_order_relaxed);
}
~LockFreeQueue() {
LockFreeNode<T>* cur = _head.load(std::memory_order_relaxed);
while (cur != nullptr) {
LockFreeNode<T>* next = cur->next.load(std::memory_order_relaxed);
delete cur;
cur = next;
}
}
// 无锁入队
void enqueue(T&& task) {
LockFreeNode<T>* new_node = new LockFreeNode<T>(std::forward<T>(task));
LockFreeNode<T>* old_tail = nullptr;
while (true) {
old_tail = _tail.load(std::memory_order_acquire);
LockFreeNode<T>* next = old_tail->next.load(std::memory_order_acquire);
if (old_tail == _tail.load(std::memory_order_acquire)) {
if (next == nullptr) {
if (old_tail->next.compare_exchange_weak(next, new_node,
std::memory_order_release, std::memory_order_relaxed)) {
_tail.compare_exchange_strong(old_tail, new_node,
std::memory_order_release, std::memory_order_relaxed);
break;
}
} else {
_tail.compare_exchange_strong(old_tail, next,
std::memory_order_release, std::memory_order_relaxed);
}
}
}
}
// 无锁出队
bool dequeue(T& out_task) {
LockFreeNode<T>* old_head = nullptr;
while (true) {
old_head = _head.load(std::memory_order_acquire);
LockFreeNode<T>* old_tail = _tail.load(std::memory_order_acquire);
LockFreeNode<T>* next = old_head->next.load(std::memory_order_acquire);
if (old_head == old_tail && next == nullptr) return false;
if (old_head == _head.load(std::memory_order_acquire)) {
if (_head.compare_exchange_weak(old_head, next,
std::memory_order_release, std::memory_order_relaxed)) {
out_task = std::move(next->data);
delete old_head;
return true;
}
}
}
}
bool is_empty() {
LockFreeNode<T>* head = _head.load(std::memory_order_relaxed);
LockFreeNode<T>* tail = _tail.load(std::memory_order_relaxed);
LockFreeNode<T>* next = head->next.load(std::memory_order_relaxed);
return (head == tail && next == nullptr);
}
};

// 无锁版固定线程池(核心改造完成)
class LockFreeFixedThreadPool {
private:
std::vector<std::thread> _worker_threads;
// 核心替换:SyncQueue -> LockFreeQueue
LockFreeQueue<std::function<void()>> _task_queue;
std::atomic<bool> _pool_running{true};
size_t _thread_count;

void worker_loop() {
while (_pool_running) {
std::function<void()> task;
// 核心替换:pop_task -> dequeue
if (_task_queue.dequeue(task)) {
if (task) task();
} else {
// 队列为空时线程让出CPU,避免空转过度占用资源
std::this_thread::yield();
}
}
}

public:
explicit LockFreeFixedThreadPool(size_t thread_num = std::thread::hardware_concurrency())
: _thread_count(thread_num) {
_worker_threads.reserve(_thread_count);
for (size_t i = 0; i < _thread_count; ++i) {
_worker_threads.emplace_back(&LockFreeFixedThreadPool::worker_loop, this);
}
}

// 任务提交接口完全保留,兼容原有调用方式
template <typename F, typename... Args>
auto submit_task(F&& func, Args&&... args) -> std::future<decltype(func(args...))> {
using ReturnType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(func), std::forward<Args>(args)...)
);
// 核心替换:push_task -> enqueue
_task_queue.enqueue([task]() { (*task)(); });
return task->get_future();
}

// 优雅关闭逻辑完全保留
void shutdown_pool() {
if (!_pool_running) return;
_pool_running = false;
for (auto& t : _worker_threads) {
if (t.joinable()) t.join();
}
}

~LockFreeFixedThreadPool() {
shutdown_pool();
}

LockFreeFixedThreadPool(const LockFreeFixedThreadPool&) = delete;
LockFreeFixedThreadPool& operator=(const LockFreeFixedThreadPool&) = delete;
LockFreeFixedThreadPool(LockFreeFixedThreadPool&&) = delete;
LockFreeFixedThreadPool& operator=(LockFreeFixedThreadPool&&) = delete;
};

// 无锁版线程池测试示例
void test_lockfree_pool() {
LockFreeFixedThreadPool pool(4);
// 提交批量高并发短任务
for (int i = 0; i < 10; ++i) {
pool.submit_task([i]() {
std::printf("无锁线程池执行任务:%d | 线程ID:%zu\n", i, std::hash<std::thread::id>{}(std::this_thread::get_id()));
});
}
// 提交带返回值任务
auto res = pool.submit_task([]() { return 2024; });
std::cout << "无锁线程池任务返回值:" << res.get() << std::endl;
pool.shutdown_pool();
}

线程池注意事项

线程池作为高并发开发的核心组件,想要稳定落地、规避性能陷阱与安全隐患,需结合队列选型、线程配置、任务特性、资源管理等全流程把控注意事项,覆盖开发、调试、运维全阶段,具体实操要点如下:

队列选型与使用

  1. 锁队列与无锁队列按需选择:普通中低并发、IO密集型任务,优先选用带条件变量的SyncQueue同步锁队列,开发难度低、无CPU空转损耗,稳定性更强;高并发短任务、CPU密集型、追求极致吞吐的场景,再选用无锁队列,同时注意无锁队列依赖原子操作,高并发下会存在CPU空转重试,需保证服务器CPU资源充足,且避免手动管理内存引发野指针问题,优先搭配智能指针优化。

    适配场景:无锁队列更适合高并发、短耗时、高频任务提交的场景,比如后端网关请求处理、高QPS服务任务调度、实时数据流转、高频日志处理等;而IO密集型、长耗时任务场景,锁队列的性能差异不明显,可根据业务场景灵活选择。

    优势:相比传统锁队列,无锁队列全程无线程阻塞、无上下文切换开销,多核CPU并行利用率大幅提升,高并发下吞吐能力可提升30%-50%;彻底规避死锁、锁优先级反转、锁竞争等待等问题,线程池运行更稳定,尤其适合多核服务器高并发场景。

  2. 强制使用有界队列:严禁生产环境使用无界队列,防止任务突发激增导致任务无限堆积,占用大量内存甚至引发OOM崩溃,需根据业务峰值提前设定队列最大容量,配合拒绝策略兜底。

  3. 队列操作线程安全:无论是锁队列还是无锁队列,严禁在队列外部手动处理线程同步逻辑,所有入队、出队、清空操作必须调用队列内置的线程安全接口,避免破坏队列内部状态引发数据竞争。

线程数配置与线程管理

  1. 按任务类型精准配置线程数:CPU密集型任务线程数严禁过量,建议设置为CPU核心数+1,减少线程上下文切换损耗;IO密集型任务可适当放宽,配置为CPU核心数的2倍左右,具体需结合业务IO等待比例动态调优,避免线程数过多导致系统负载飙升。
  2. 杜绝线程泄漏:线程池关闭时必须调用shutdown系列安全关闭接口,等待所有工作线程执行完毕再join回收,严禁直接终止线程池,防止线程资源无法释放、任务未执行完毕。
  3. 避免线程阻塞挂死:工作线程执行的任务中,严禁出现无限循环、死锁、长时间阻塞(如无超时的IO等待、锁等待),一旦单个线程挂死,会持续占用线程池资源,高并发下会快速耗尽所有线程,导致线程池无法处理新任务。
  4. 动态线程池管控上限:缓存线程池需严格设定最大线程数上限,避免突发高并发下无限创建新线程,耗尽系统线程资源,同时合理配置空闲线程超时回收时间,平衡资源占用与响应速度。

任务设计与提交

  1. 任务轻量化设计:线程池适合处理短平快的并发任务,单任务耗时不宜过长,长耗时任务建议拆分或单独创建线程处理,避免长期占用工作线程,阻塞后续任务执行。
  2. 任务内部捕获异常:提交的任务内部必须做好完整的异常捕获处理,严禁任务抛出未捕获异常,否则会导致当前工作线程崩溃退出,动态线程池虽会重建线程,但会造成性能损耗,固定线程池则会直接减少可用线程数量。
  3. 合理使用拒绝策略:队列满、线程池达到最大负载时,需配置对应的拒绝策略,可选择提交线程执行、丢弃任务、抛出异常、丢弃最旧任务等方式,严禁无拒绝策略直接丢弃任务,避免核心业务任务丢失。
  4. 禁止任务间依赖:线程池内的任务严禁相互依赖,尤其是固定线程池、单线程池,任务依赖极易引发死锁,导致线程池整体阻塞,所有任务需设计为无状态、可独立执行。

并发安全与资源管控

  1. 规避死锁风险:任务内部如需使用锁,严禁嵌套锁、循环等待,无锁队列需注意ABA问题(常规线程池任务场景可忽略,复杂指针操作需额外加版本号规避),杜绝锁竞争导致的线程死锁。
  2. 共享资源同步:多个任务同时访问共享变量、全局资源时,必须做好线程同步管控,无锁队列仅保障队列自身线程安全,不负责任务内部共享资源的同步,需额外通过原子操作或轻量级锁处理。
  3. 系统资源负载监控:生产环境使用线程池时,需监控线程数、队列堆积长度、CPU使用率、内存占用等指标,队列持续堆积、CPU负载过高时,及时调整线程数或扩容队列容量,保障系统稳定运行。

线程池生命周期与运维

  1. 单例复用线程池:同一业务模块严禁重复创建多个线程池,建议采用单例模式复用线程池实例,避免多线程池相互抢占系统资源,提升资源利用率。
  2. 优雅关闭线程池:程序退出时,必须主动调用线程池的关闭接口,等待未完成任务执行完毕,再释放线程池资源,防止程序退出时任务丢失、内存泄漏。
  3. 适配编译环境:所有线程池代码基于C++11标准实现,编译时需开启C++11及以上标准支持,Windows平台使用VS2019及以上版本,Linux平台g++编译器版本不低于4.8,确保原子操作、线程库正常运行。