您的位置:首页 > 娱乐 > 明星 > 高性能编程:无锁队列----MPSCQueue代码实践

高性能编程:无锁队列----MPSCQueue代码实践

2024/10/5 22:32:18 来源:https://blog.csdn.net/weixin_43925427/article/details/142258099  浏览:    关键词:高性能编程:无锁队列----MPSCQueue代码实践

目录

1. MPSC队列概述

2. 非侵入式MPSC队列实现

结构解析

主要方法

构造函数与析构函数

Enqueue(入队)

Dequeue(出队)

3. 侵入式MPSC队列实现

结构解析

主要方法

构造函数与析构函数

Enqueue(入队)

Dequeue(出队)

4. 模板别名 MPSCQueue

5. 测试程序解析

关键点解析

注意事项

6. 工作机制详解

Enqueue过程

Dequeue过程

内存序与原子操作

7. 优缺点与优化

优点

缺点

8. 总结



高性能编程:无锁队列概念icon-default.png?t=O83Ahttps://blog.csdn.net/weixin_43925427/article/details/142203825?fromshare=blogdetail&sharetype=blogdetail&sharerId=142203825&sharerefer=PC&sharesource=weixin_43925427&sharefrom=from_link

1. MPSC队列概述

MPSC队列(Multiple Producers Single Consumer)是一种支持多个生产者线程并发入队,但仅有一个消费者线程出队的队列结构。它在多线程环境中常用于日志系统、任务调度等场景。

无锁实现意味着队列在操作时不依赖于互斥锁等阻塞机制,而是通过原子操作(如 std::atomic)确保线程安全,从而提升性能和并发性。

2. 非侵入式MPSC队列实现

非侵入式队列意味着消息对象本身不需要嵌入队列的链接指针,队列管理使用外部结构来维护消息之间的链接关系。

结构解析
template<typename T>
class MPSCQueueNonIntrusive
{
public:MPSCQueueNonIntrusive();~MPSCQueueNonIntrusive();// wait-freevoid Enqueue(T* input);bool Dequeue(T*& result);private:struct Node{Node();explicit Node(T* data);T* Data;std::atomic<Node*> Next;};std::atomic<Node*> _head;std::atomic<Node*> _tail;// 禁止拷贝和赋值MPSCQueueNonIntrusive(MPSCQueueNonIntrusive const&) = delete;MPSCQueueNonIntrusive& operator=(MPSCQueueNonIntrusive const&) = delete;
};
  • Node结构体

    • Data:指向实际消息数据的指针。
    • Next:原子指针,指向下一个节点。
  • 成员变量

    • _head:原子指针,指向队列的头节点。
    • _tail:原子指针,指向队列的尾节点。
主要方法
构造函数与析构函数
MPSCQueueNonIntrusive() : _head(new Node()), _tail(_head.load(std::memory_order_relaxed))
{Node* front = _head.load(std::memory_order_relaxed);front->Next.store(nullptr, std::memory_order_relaxed);
}~MPSCQueueNonIntrusive()
{T* output;while (Dequeue(output))delete output;Node* front = _head.load(std::memory_order_relaxed);delete front;
}
  • 构造函数

    • 初始化一个哑节点(dummy node),确保队列在开始时为空但有一个有效节点。
    • headtail 都指向这个哑节点。
    • 设置 front->Nextnullptr,表示队列结束。
  • 析构函数

    • 依次出队所有剩余的消息并删除。
    • 删除哑节点,释放资源。
Enqueue(入队)
void Enqueue(T* input)
{Node* node = new Node(input);Node* prevHead = _head.exchange(node, std::memory_order_acq_rel);prevHead->Next.store(node, std::memory_order_release);
}
  • 步骤

    1. 创建一个新的节点,存储要入队的消息。
    2. 使用 exchange 操作将 _head 原子地指向新节点,并获取之前的头节点 prevHead
    3. prevHead->Next 指向新节点,链接新节点到队列尾部。
  • 内存序

    • std::memory_order_acq_rel:确保 exchange 操作前后的操作顺序。
    • std::memory_order_release:确保之前的写操作对其他线程可见。
  • 相关内存序概念,请移步同步艺术:原子操作与锁的技术探索icon-default.png?t=O83Ahttps://blog.csdn.net/weixin_43925427/article/details/142166623?fromshare=blogdetail&sharetype=blogdetail&sharerId=142166623&sharerefer=PC&sharesource=weixin_43925427&sharefrom=from_link
  • 特性

    • 多生产者安全:多个生产者可以同时调用 Enqueue,通过 exchange 确保操作的原子性。
    • 等待无锁(Wait-Free):生产者在有限步骤内完成入队操作,不会因其他线程阻塞。
Dequeue(出队)
bool Dequeue(T*& result)
{Node* tail = _tail.load(std::memory_order_relaxed);Node* next = tail->Next.load(std::memory_order_acquire);if (!next)return false;result = next->Data;_tail.store(next, std::memory_order_release);delete tail;return true;
}
  • 步骤

    1. 获取当前尾节点 tail
    2. 读取 tail->Next 指针 next
    3. 如果 nextnullptr,表示队列为空,返回 false
    4. 否则,将 next->Data 赋值给 result
    5. 更新 _tail 指针到 next
    6. 删除旧的 tail 节点,释放内存。
    7. 返回 true,表示成功出队。
  • 内存序

    • std::memory_order_acquire:确保读取 Next 之前的所有写操作对当前线程可见。
    • std::memory_order_release:确保更新 _tail 后的操作顺序。
  • 特性

    • 单消费者安全:仅一个消费者线程调用 Dequeue,无需额外同步。
    • 高效出队:在单消费者环境下,出队操作非常高效。

3. 侵入式MPSC队列实现

侵入式队列要求消息对象自身包含用于链接队列的指针,这样队列管理可以直接访问消息对象的链接指针,减少内存分配开销

结构解析
template<typename T, std::atomic<T*> T::* IntrusiveLink>
class MPSCQueueIntrusive
{
public:MPSCQueueIntrusive();~MPSCQueueIntrusive();void Enqueue(T* input);bool Dequeue(T*& result);private:std::aligned_storage_t<sizeof(T), alignof(T)> _dummy;T* _dummyPtr;std::atomic<T*> _head;std::atomic<T*> _tail;// 禁止拷贝和赋值MPSCQueueIntrusive(MPSCQueueIntrusive const&) = delete;MPSCQueueIntrusive& operator=(MPSCQueueIntrusive const&) = delete;
};
  • 成员变量

    • _dummy:用于初始化队列的哑节点。
    • _dummyPtr:指向哑节点的指针。
    • _head_tail:原子指针,分别指向队列的头和尾。
  • 模板参数

    • T:消息类型。
    • IntrusiveLink:消息结构体中用于链接的原子指针成员变量。
主要方法
构造函数与析构函数
MPSCQueueIntrusive() : _dummyPtr(reinterpret_cast<T*>(std::addressof(_dummy))), _head(_dummyPtr), _tail(_dummyPtr)
{// _dummy 是通过对齐存储创建的未初始化对象std::atomic<T*>* dummyNext = new (&(_dummyPtr->*IntrusiveLink)) std::atomic<T*>();dummyNext->store(nullptr, std::memory_order_relaxed);
}~MPSCQueueIntrusive()
{T* output;while (Dequeue(output))delete output;
}
  • 构造函数

    • 使用 std::aligned_storage_t 创建一个未初始化的哑节点 _dummy,确保其内存对齐。
    • 初始化 _head_tail 都指向 _dummyPtr
    • _dummyPtrIntrusiveLink 成员上构造一个 std::atomic<T*>,并设置为 nullptr
  • 析构函数

    • 依次出队所有消息并删除。
    • 删除哑节点(不需要,因为是通过 aligned_storage 分配的)。
Enqueue(入队)
void Enqueue(T* input)
{(input->*IntrusiveLink).store(nullptr, std::memory_order_release);T* prevHead = _head.exchange(input, std::memory_order_acq_rel);(prevHead->*IntrusiveLink).store(input, std::memory_order_release);
}
  • 步骤

    1. 设置 input 消息的 IntrusiveLinknullptr,标记为队列末尾。
    2. 使用 exchange 操作将 _head 原子地指向 input,并获取之前的头节点 prevHead
    3. prevHeadIntrusiveLink 指向 input,链接新节点到队列尾部。
  • 内存序

    • std::memory_order_releasestd::memory_order_acq_rel 确保操作的顺序性和可见性。
  • 特性

    • 多生产者安全:多个生产者可以同时调用 Enqueue,通过 exchange 确保操作的原子性。
    • 等待无锁(Wait-Free):入队操作在有限步骤内完成。
Dequeue(出队)
bool Dequeue(T*& result)
{T* tail = _tail.load(std::memory_order_relaxed);T* next = (tail->*IntrusiveLink).load(std::memory_order_acquire);if (tail == _dummyPtr){if (!next)return false;_tail.store(next, std::memory_order_release);tail = next;next = (next->*IntrusiveLink).load(std::memory_order_acquire);}if (next){_tail.store(next, std::memory_order_release);result = tail;return true;}T* head = _head.load(std::memory_order_acquire);if (tail != head)return false;Enqueue(_dummyPtr);next = (tail->*IntrusiveLink).load(std::memory_order_acquire);if (next){_tail.store(next, std::memory_order_release);result = tail;return true;}return false;
}
  • 步骤

    1. 获取当前尾节点 tail
    2. 读取 tail->IntrusiveLink 指针 next
    3. 如果 tail 是哑节点:
      • 如果 next 为空,队列为空,返回 false
      • 否则,更新 _tail 指向 next,并再次读取 next
    4. 如果 next 不为空:
      • 更新 _tail 指向 next
      • tail(实际消息)赋值给 result,返回 true
    5. 否则,读取 _head 指针 head
      • 如果 tail 不等于 head,队列仍有消息,返回 false
      • 否则,将 _dummyPtr 入队,尝试再次读取 next
      • 如果 next 不为空,更新 _tail 指向 next,将 tail 赋值给 result,返回 true
      • 否则,队列为空,返回 false
  • 内存序

    • 使用 std::memory_order_acquirestd::memory_order_release 确保操作的顺序性和可见性。
  • 特性

    • 单消费者安全:仅一个消费者调用 Dequeue,无需额外同步。
    • 高效出队:在单消费者环境下,出队操作非常高效。

4. 模板别名 MPSCQueue

template<typename T, std::atomic<T*> T::* IntrusiveLink = nullptr>
using MPSCQueue = std::conditional_t<IntrusiveLink != nullptr, MPSCQueueIntrusive<T, IntrusiveLink>, MPSCQueueNonIntrusive<T>>;
  • 功能

    • 根据是否提供 IntrusiveLink 成员指针,选择侵入式或非侵入式实现。
    • 如果 IntrusiveLink 不为 nullptr,使用 MPSCQueueIntrusive,否则使用 MPSCQueueNonIntrusive
  • 用法

    • 非侵入式MPSCQueue<Count>
    • 侵入式MPSCQueue<Count, &Count::next>

5. 测试程序解析

#include "MPSCQueue.h"#include <thread>
#include <iostream>// 消息结构体
struct Count {Count(int _v) : v(_v){}int v;
};int main() {MPSCQueue<Count> queue;std::thread pd1([&]() {queue.Enqueue(new Count(100));queue.Enqueue(new Count(200));queue.Enqueue(new Count(300));queue.Enqueue(new Count(400));});std::thread pd2([&]() {queue.Enqueue(new Count(500));queue.Enqueue(new Count(600));queue.Enqueue(new Count(700));queue.Enqueue(new Count(800));});std::thread cs1([&]() {Count* ele;while(queue.Dequeue(ele)) {std::cout << std::this_thread::get_id() << " : pop " << ele->v << std::endl;delete ele;}});pd1.join();pd2.join();cs1.join();return 0;
}
关键点解析
  1. 消息结构体 Count

    • 包含一个整数成员 v
    • 注意:当前 Count 结构体不包含链接指针,适用于非侵入式队列实现。
  2. 创建消息队列

    MPSCQueue<Count> queue;
    
    • 使用非侵入式实现,因为未提供 IntrusiveLink
    • MPSCQueueNonIntrusive<Count> 被实例化。
  3. 生产者线程

    • 两个生产者线程 pd1pd2,分别入队四个不同的 Count 消息。
    • 生产者通过 Enqueue 方法将消息指针放入队列。
  4. 消费者线程

    • 一个消费者线程 cs1,不断调用 Dequeue 获取消息。
    • 成功出队后,打印消息内容并删除消息对象。
  5. 线程同步

    • 主线程等待所有生产者和消费者线程完成。
注意事项
  • 非侵入式使用

    • 消息对象 Count 不包含链接指针,MPSCQueueNonIntrusive 使用内部 Node 结构管理链接。
    • 消息对象和节点是分离的,队列管理外部节点的链接。
  • 内存管理

    • 生产者通过 new 分配消息对象。
    • 消费者在出队后通过 delete 释放消息对象。

6. 工作机制详解

Enqueue过程
  1. 创建节点

    • 生产者通过 new Node(input) 创建一个包含消息数据的节点。
    • Node 构造函数将 Data 指针设置为 input,并初始化 Nextnullptr
  2. 交换头指针

    • 使用 _head.exchange(node, std::memory_order_acq_rel)_head 原子地指向新节点。
    • 获取之前的头节点 prevHead
  3. 链接新节点

    • prevHead->Next 原子地指向新节点,完成入队操作。

关键点

  • exchange 操作确保多个生产者在并发情况下能够安全地交换头指针。
  • prevHead->Next.store(node, std::memory_order_release) 确保消息链接的正确性。
Dequeue过程
  1. 获取当前尾节点

    • 消费者读取 _tail 指针,指向当前尾节点 tail
  2. 读取下一个节点

    • 读取 tail->Next 指针 next,指向下一个节点。
  3. 判断队列是否为空

    • 如果 nextnullptr,队列为空,返回 false
    • 否则,继续出队。
  4. 获取消息数据

    • next->Data 赋值给 result,即出队的消息数据。
  5. 更新尾指针

    • 使用 _tail.store(next, std::memory_order_release) 将尾指针更新到 next 节点。
  6. 删除旧节点

    • 删除旧的 tail 节点,释放内存。

关键点

  • 仅有一个消费者线程,避免了出队操作的竞争。
  • std::memory_order_acquirestd::memory_order_release 确保内存操作的顺序性和可见性。
内存序与原子操作
  • 内存序

    • 确保不同线程之间对共享数据的读写顺序性和可见性。
    • std::memory_order_acq_rel:用于 exchange 操作,确保前后的操作有序。
    • std::memory_order_release:用于设置 Next 指针,确保之前的写操作对其他线程可见。
    • std::memory_order_acquire:用于读取 Next 指针,确保读取到的数据是最新的。
  • 原子操作

    • std::atomic 确保对指针的读写操作是原子的,防止数据竞争和不一致性。
    • exchangeloadstore 是关键的原子操作,确保线程安全。

7. 优缺点与优化

优点
  1. 高并发性

    • 支持多个生产者同时入队,提升生产效率。
  2. 无锁实现

    • 通过原子操作实现无锁,避免传统锁带来的性能瓶颈和上下文切换开销。
  3. 简单高效

    • 非侵入式实现简单易用,侵入式实现减少了内存分配开销。
  4. 等待无锁

    • 入队操作在有限步骤内完成,减少了阻塞和等待。
缺点
  1. 单消费者限制

    • 仅支持单一消费者,无法扩展到多消费者场景。
  2. 内存管理复杂

    • 需要手动管理消息对象的内存,容易引发内存泄漏或悬挂指针。
  3. 侵入式限制

    • 侵入式实现要求消息对象包含链接指针,限制了消息结构的设计。
  4. 无阻塞出队

    • 出队操作在队列为空时返回 false,消费者需要自行处理空队列逻辑。

8. 总结

MPSCQueue 实现通过 非侵入式侵入式 两种方式,实现了多生产者单消费者的无锁队列。这种实现利用了原子操作和合适的内存序,确保了在高并发环境下的线程安全和高效性。

  • 非侵入式实现 适用于消息结构体不希望嵌入链接指针的场景,通过内部 Node 结构管理消息链接。
  • 侵入式实现 通过嵌入链接指针,减少了内存分配开销,适用于可以修改消息结构体的场景。

这种 MPSC 实现适合应用于需要多个生产者高效入队,且仅有单一消费者处理消息的场景,如日志系统、任务调度等。

参考:

0voice · GitHub

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com