目录
1. MPSC队列概述
2. 非侵入式MPSC队列实现
结构解析
主要方法
构造函数与析构函数
Enqueue(入队)
Dequeue(出队)
3. 侵入式MPSC队列实现
结构解析
主要方法
构造函数与析构函数
Enqueue(入队)
Dequeue(出队)
4. 模板别名 MPSCQueue
5. 测试程序解析
关键点解析
注意事项
6. 工作机制详解
Enqueue过程
Dequeue过程
内存序与原子操作
7. 优缺点与优化
优点
缺点
8. 总结
高性能编程:无锁队列概念https://blog.csdn.net/weixin_43925427/article/details/142203825?fromshare=blogdetail&sharetype=blogdetail&sharerId=142203825&sharerefer=PC&sharesource=weixin_43925427&sharefrom=from_link
1. MPSC队列概述
MPSC队列(Multiple Producers Single Consumer)是一种支持多个生产者线程并发入队,但仅有一个消费者线程出队的队列结构。它在多线程环境中常用于日志系统、任务调度等场景。
无锁实现意味着队列在操作时不依赖于互斥锁等阻塞机制,而是通过原子操作(如 std::atomic
)确保线程安全,从而提升性能和并发性。
2. 非侵入式MPSC队列实现
非侵入式队列意味着消息对象本身不需要嵌入队列的链接指针,队列管理使用外部结构来维护消息之间的链接关系。
结构解析
template<typename T>
class MPSCQueueNonIntrusive
{
public:MPSCQueueNonIntrusive();~MPSCQueueNonIntrusive();// wait-freevoid Enqueue(T* input);bool Dequeue(T*& result);private:struct Node{Node();explicit Node(T* data);T* Data;std::atomic<Node*> Next;};std::atomic<Node*> _head;std::atomic<Node*> _tail;// 禁止拷贝和赋值MPSCQueueNonIntrusive(MPSCQueueNonIntrusive const&) = delete;MPSCQueueNonIntrusive& operator=(MPSCQueueNonIntrusive const&) = delete;
};
-
Node结构体:
- Data:指向实际消息数据的指针。
- Next:原子指针,指向下一个节点。
-
成员变量:
- _head:原子指针,指向队列的头节点。
- _tail:原子指针,指向队列的尾节点。
主要方法
构造函数与析构函数
MPSCQueueNonIntrusive() : _head(new Node()), _tail(_head.load(std::memory_order_relaxed))
{Node* front = _head.load(std::memory_order_relaxed);front->Next.store(nullptr, std::memory_order_relaxed);
}~MPSCQueueNonIntrusive()
{T* output;while (Dequeue(output))delete output;Node* front = _head.load(std::memory_order_relaxed);delete front;
}
-
构造函数:
- 初始化一个哑节点(dummy node),确保队列在开始时为空但有一个有效节点。
head
和tail
都指向这个哑节点。- 设置
front->Next
为nullptr
,表示队列结束。
-
析构函数:
- 依次出队所有剩余的消息并删除。
- 删除哑节点,释放资源。
Enqueue(入队)
void Enqueue(T* input)
{Node* node = new Node(input);Node* prevHead = _head.exchange(node, std::memory_order_acq_rel);prevHead->Next.store(node, std::memory_order_release);
}
-
步骤:
- 创建一个新的节点,存储要入队的消息。
- 使用
exchange
操作将_head
原子地指向新节点,并获取之前的头节点prevHead
。 - 将
prevHead->Next
指向新节点,链接新节点到队列尾部。
-
内存序:
std::memory_order_acq_rel
:确保exchange
操作前后的操作顺序。std::memory_order_release
:确保之前的写操作对其他线程可见。
- 相关内存序概念,请移步同步艺术:原子操作与锁的技术探索
https://blog.csdn.net/weixin_43925427/article/details/142166623?fromshare=blogdetail&sharetype=blogdetail&sharerId=142166623&sharerefer=PC&sharesource=weixin_43925427&sharefrom=from_link
-
特性:
- 多生产者安全:多个生产者可以同时调用
Enqueue
,通过exchange
确保操作的原子性。 - 等待无锁(Wait-Free):生产者在有限步骤内完成入队操作,不会因其他线程阻塞。
- 多生产者安全:多个生产者可以同时调用
Dequeue(出队)
bool Dequeue(T*& result)
{Node* tail = _tail.load(std::memory_order_relaxed);Node* next = tail->Next.load(std::memory_order_acquire);if (!next)return false;result = next->Data;_tail.store(next, std::memory_order_release);delete tail;return true;
}
-
步骤:
- 获取当前尾节点
tail
。 - 读取
tail->Next
指针next
。 - 如果
next
为nullptr
,表示队列为空,返回false
。 - 否则,将
next->Data
赋值给result
。 - 更新
_tail
指针到next
。 - 删除旧的
tail
节点,释放内存。 - 返回
true
,表示成功出队。
- 获取当前尾节点
-
内存序:
std::memory_order_acquire
:确保读取Next
之前的所有写操作对当前线程可见。std::memory_order_release
:确保更新_tail
后的操作顺序。
-
特性:
- 单消费者安全:仅一个消费者线程调用
Dequeue
,无需额外同步。 - 高效出队:在单消费者环境下,出队操作非常高效。
- 单消费者安全:仅一个消费者线程调用
3. 侵入式MPSC队列实现
侵入式队列要求消息对象自身包含用于链接队列的指针,这样队列管理可以直接访问消息对象的链接指针,减少内存分配开销。
结构解析
template<typename T, std::atomic<T*> T::* IntrusiveLink>
class MPSCQueueIntrusive
{
public:MPSCQueueIntrusive();~MPSCQueueIntrusive();void Enqueue(T* input);bool Dequeue(T*& result);private:std::aligned_storage_t<sizeof(T), alignof(T)> _dummy;T* _dummyPtr;std::atomic<T*> _head;std::atomic<T*> _tail;// 禁止拷贝和赋值MPSCQueueIntrusive(MPSCQueueIntrusive const&) = delete;MPSCQueueIntrusive& operator=(MPSCQueueIntrusive const&) = delete;
};
-
成员变量:
- _dummy:用于初始化队列的哑节点。
- _dummyPtr:指向哑节点的指针。
- _head 和 _tail:原子指针,分别指向队列的头和尾。
-
模板参数:
- T:消息类型。
- IntrusiveLink:消息结构体中用于链接的原子指针成员变量。
主要方法
构造函数与析构函数
MPSCQueueIntrusive() : _dummyPtr(reinterpret_cast<T*>(std::addressof(_dummy))), _head(_dummyPtr), _tail(_dummyPtr)
{// _dummy 是通过对齐存储创建的未初始化对象std::atomic<T*>* dummyNext = new (&(_dummyPtr->*IntrusiveLink)) std::atomic<T*>();dummyNext->store(nullptr, std::memory_order_relaxed);
}~MPSCQueueIntrusive()
{T* output;while (Dequeue(output))delete output;
}
-
构造函数:
- 使用
std::aligned_storage_t
创建一个未初始化的哑节点_dummy
,确保其内存对齐。 - 初始化
_head
和_tail
都指向_dummyPtr
。 - 在
_dummyPtr
的IntrusiveLink
成员上构造一个std::atomic<T*>
,并设置为nullptr
。
- 使用
-
析构函数:
- 依次出队所有消息并删除。
- 删除哑节点(不需要,因为是通过
aligned_storage
分配的)。
Enqueue(入队)
void Enqueue(T* input)
{(input->*IntrusiveLink).store(nullptr, std::memory_order_release);T* prevHead = _head.exchange(input, std::memory_order_acq_rel);(prevHead->*IntrusiveLink).store(input, std::memory_order_release);
}
-
步骤:
- 设置
input
消息的IntrusiveLink
为nullptr
,标记为队列末尾。 - 使用
exchange
操作将_head
原子地指向input
,并获取之前的头节点prevHead
。 - 将
prevHead
的IntrusiveLink
指向input
,链接新节点到队列尾部。
- 设置
-
内存序:
std::memory_order_release
和std::memory_order_acq_rel
确保操作的顺序性和可见性。
-
特性:
- 多生产者安全:多个生产者可以同时调用
Enqueue
,通过exchange
确保操作的原子性。 - 等待无锁(Wait-Free):入队操作在有限步骤内完成。
- 多生产者安全:多个生产者可以同时调用
Dequeue(出队)
bool Dequeue(T*& result)
{T* tail = _tail.load(std::memory_order_relaxed);T* next = (tail->*IntrusiveLink).load(std::memory_order_acquire);if (tail == _dummyPtr){if (!next)return false;_tail.store(next, std::memory_order_release);tail = next;next = (next->*IntrusiveLink).load(std::memory_order_acquire);}if (next){_tail.store(next, std::memory_order_release);result = tail;return true;}T* head = _head.load(std::memory_order_acquire);if (tail != head)return false;Enqueue(_dummyPtr);next = (tail->*IntrusiveLink).load(std::memory_order_acquire);if (next){_tail.store(next, std::memory_order_release);result = tail;return true;}return false;
}
-
步骤:
- 获取当前尾节点
tail
。 - 读取
tail->IntrusiveLink
指针next
。 - 如果
tail
是哑节点:- 如果
next
为空,队列为空,返回false
。 - 否则,更新
_tail
指向next
,并再次读取next
。
- 如果
- 如果
next
不为空:- 更新
_tail
指向next
。 - 将
tail
(实际消息)赋值给result
,返回true
。
- 更新
- 否则,读取
_head
指针head
。- 如果
tail
不等于head
,队列仍有消息,返回false
。 - 否则,将
_dummyPtr
入队,尝试再次读取next
。 - 如果
next
不为空,更新_tail
指向next
,将tail
赋值给result
,返回true
。 - 否则,队列为空,返回
false
。
- 如果
- 获取当前尾节点
-
内存序:
- 使用
std::memory_order_acquire
和std::memory_order_release
确保操作的顺序性和可见性。
- 使用
-
特性:
- 单消费者安全:仅一个消费者调用
Dequeue
,无需额外同步。 - 高效出队:在单消费者环境下,出队操作非常高效。
- 单消费者安全:仅一个消费者调用
4. 模板别名 MPSCQueue
template<typename T, std::atomic<T*> T::* IntrusiveLink = nullptr>
using MPSCQueue = std::conditional_t<IntrusiveLink != nullptr, MPSCQueueIntrusive<T, IntrusiveLink>, MPSCQueueNonIntrusive<T>>;
-
功能:
- 根据是否提供
IntrusiveLink
成员指针,选择侵入式或非侵入式实现。 - 如果
IntrusiveLink
不为nullptr
,使用MPSCQueueIntrusive
,否则使用MPSCQueueNonIntrusive
。
- 根据是否提供
-
用法:
- 非侵入式:
MPSCQueue<Count>
- 侵入式:
MPSCQueue<Count, &Count::next>
- 非侵入式:
5. 测试程序解析
#include "MPSCQueue.h"#include <thread>
#include <iostream>// 消息结构体
struct Count {Count(int _v) : v(_v){}int v;
};int main() {MPSCQueue<Count> queue;std::thread pd1([&]() {queue.Enqueue(new Count(100));queue.Enqueue(new Count(200));queue.Enqueue(new Count(300));queue.Enqueue(new Count(400));});std::thread pd2([&]() {queue.Enqueue(new Count(500));queue.Enqueue(new Count(600));queue.Enqueue(new Count(700));queue.Enqueue(new Count(800));});std::thread cs1([&]() {Count* ele;while(queue.Dequeue(ele)) {std::cout << std::this_thread::get_id() << " : pop " << ele->v << std::endl;delete ele;}});pd1.join();pd2.join();cs1.join();return 0;
}
关键点解析
-
消息结构体
Count
:- 包含一个整数成员
v
。 - 注意:当前
Count
结构体不包含链接指针,适用于非侵入式队列实现。
- 包含一个整数成员
-
创建消息队列:
MPSCQueue<Count> queue;
- 使用非侵入式实现,因为未提供
IntrusiveLink
。 MPSCQueueNonIntrusive<Count>
被实例化。
- 使用非侵入式实现,因为未提供
-
生产者线程:
- 两个生产者线程
pd1
和pd2
,分别入队四个不同的Count
消息。 - 生产者通过
Enqueue
方法将消息指针放入队列。
- 两个生产者线程
-
消费者线程:
- 一个消费者线程
cs1
,不断调用Dequeue
获取消息。 - 成功出队后,打印消息内容并删除消息对象。
- 一个消费者线程
-
线程同步:
- 主线程等待所有生产者和消费者线程完成。
注意事项
-
非侵入式使用:
- 消息对象
Count
不包含链接指针,MPSCQueueNonIntrusive
使用内部Node
结构管理链接。 - 消息对象和节点是分离的,队列管理外部节点的链接。
- 消息对象
-
内存管理:
- 生产者通过
new
分配消息对象。 - 消费者在出队后通过
delete
释放消息对象。
- 生产者通过
6. 工作机制详解
Enqueue过程
-
创建节点:
- 生产者通过
new Node(input)
创建一个包含消息数据的节点。 Node
构造函数将Data
指针设置为input
,并初始化Next
为nullptr
。
- 生产者通过
-
交换头指针:
- 使用
_head.exchange(node, std::memory_order_acq_rel)
将_head
原子地指向新节点。 - 获取之前的头节点
prevHead
。
- 使用
-
链接新节点:
- 将
prevHead->Next
原子地指向新节点,完成入队操作。
- 将
关键点:
exchange
操作确保多个生产者在并发情况下能够安全地交换头指针。prevHead->Next.store(node, std::memory_order_release)
确保消息链接的正确性。
Dequeue过程
-
获取当前尾节点:
- 消费者读取
_tail
指针,指向当前尾节点tail
。
- 消费者读取
-
读取下一个节点:
- 读取
tail->Next
指针next
,指向下一个节点。
- 读取
-
判断队列是否为空:
- 如果
next
为nullptr
,队列为空,返回false
。 - 否则,继续出队。
- 如果
-
获取消息数据:
- 将
next->Data
赋值给result
,即出队的消息数据。
- 将
-
更新尾指针:
- 使用
_tail.store(next, std::memory_order_release)
将尾指针更新到next
节点。
- 使用
-
删除旧节点:
- 删除旧的
tail
节点,释放内存。
- 删除旧的
关键点:
- 仅有一个消费者线程,避免了出队操作的竞争。
std::memory_order_acquire
和std::memory_order_release
确保内存操作的顺序性和可见性。
内存序与原子操作
-
内存序:
- 确保不同线程之间对共享数据的读写顺序性和可见性。
std::memory_order_acq_rel
:用于exchange
操作,确保前后的操作有序。std::memory_order_release
:用于设置Next
指针,确保之前的写操作对其他线程可见。std::memory_order_acquire
:用于读取Next
指针,确保读取到的数据是最新的。
-
原子操作:
std::atomic
确保对指针的读写操作是原子的,防止数据竞争和不一致性。exchange
、load
和store
是关键的原子操作,确保线程安全。
7. 优缺点与优化
优点
-
高并发性:
- 支持多个生产者同时入队,提升生产效率。
-
无锁实现:
- 通过原子操作实现无锁,避免传统锁带来的性能瓶颈和上下文切换开销。
-
简单高效:
- 非侵入式实现简单易用,侵入式实现减少了内存分配开销。
-
等待无锁:
- 入队操作在有限步骤内完成,减少了阻塞和等待。
缺点
-
单消费者限制:
- 仅支持单一消费者,无法扩展到多消费者场景。
-
内存管理复杂:
- 需要手动管理消息对象的内存,容易引发内存泄漏或悬挂指针。
-
侵入式限制:
- 侵入式实现要求消息对象包含链接指针,限制了消息结构的设计。
-
无阻塞出队:
- 出队操作在队列为空时返回
false
,消费者需要自行处理空队列逻辑。
- 出队操作在队列为空时返回
8. 总结
MPSCQueue 实现通过 非侵入式 和 侵入式 两种方式,实现了多生产者单消费者的无锁队列。这种实现利用了原子操作和合适的内存序,确保了在高并发环境下的线程安全和高效性。
- 非侵入式实现 适用于消息结构体不希望嵌入链接指针的场景,通过内部
Node
结构管理消息链接。 - 侵入式实现 通过嵌入链接指针,减少了内存分配开销,适用于可以修改消息结构体的场景。
这种 MPSC 实现适合应用于需要多个生产者高效入队,且仅有单一消费者处理消息的场景,如日志系统、任务调度等。
参考:
0voice · GitHub