无锁队列 SPSC

无锁队列 SPSC Queueicon-default.png?t=N7T8https://www.cnblogs.com/sinkinben/p/17949761/spsc-queue

在多线程编程中,一个著名的问题是生产者-消费者问题 (Producer Consumer Problem, PC Problem)。

对于这类问题,通过信号量加锁 (https://www.cnblogs.com/sinkinben/p/14087750.html) 来设计 RingBuffer 是十分容易实现的,但欠缺性能。

考虑一个特殊的场景,生产者和消费者均只有一个 (Single Producer Single Consumer, SPSC),在这种情况下,我们可以设计一个无锁队列来解决 PC 问题。

0. Background

考虑以下场景:在一个计算密集型 (Computing Intensive) 和延迟敏感的 for 循环当中,每次循环结束,需要打印当前的迭代次数以及计算结果。

void matrix_compute()
{for (i = 0 to n){// code of computing...// print i and result of computingstd::cout << ...}
}

在这种情况下,如果使用简单的 std::cout 输出,由于 I/O 的性质,将会造成严重的延迟 (Latency)。

一个直观的解决办法是:将 Log 封装为一个字符串,传递给其他线程,让其他线程打印该字符串,实现异步的 Logging 。

1. Lock-free SPSC Queue

此处使用一个 RingBuffer 来实现队列。

由于是 SPSC 型的队列,队列头部 head 只会被 Consumer 写入,队列尾部 tail 只会被 Producer 写入,所以 SPSC Queue 可以是无锁的,但需要保证写入的原子性。

template <class T> class spsc_queue
{private:std::vector<T> m_buffer;std::atomic<size_t> m_head;std::atomic<size_t> m_tail;public:spsc_queue(size_t capacity) : m_buffer(capacity + 1), m_head(0), m_tail(0) {}inline bool enqueue(const T &item);inline bool dequeue(T &item);
};

对于一个 RingBuffer 而言,判空与判满的方法如下:

  • Empty 的条件:head == tail
  • Full 的条件:(tail + 1) % N == head

因此,enqueue 和 dequeue 可以是以下的实现:

inline bool enqueue(const T &item)
{const size_t tail = m_tail.load(std::memory_order_relaxed);const size_t next = (tail + 1) % m_buffer.size();if (next == m_head.load(std::memory_order_acquire))return false;m_buffer[tail] = item;m_tail.store(next, std::memory_order_release);return true;
}inline bool dequeue(T &item)
{const size_t head = m_head.load(std::memory_order_relaxed);if (head == m_tail.load(std::memory_order_acquire))return false;item = m_buffer[head];const size_t next = (head + 1) % m_buffer.size();m_head.store(next, std::memory_order_release);return true;
}

std::memory_order 的使用说明:std::memory_order - cppreference.com

Benchmark 计算 SPSC Queue 的吞吐量:

Mean:   29,158,897.200000 elements/s 
Median: 29,178,822.000000 elements/s 
Max:    29,315,199 elements/s 
Min:    28,995,515 elements/s 

Benchmark 的计算方法为:

  • Producer 和 Consumer 分别执行 1e8 次 enqueue 和 dequeue ,计算队列为空所耗费的总时间 t, 1e8 / t 即为吞吐量。
  • 上述过程执行 10 次,最终计算 mean, median, min, max 的值。

2. Remove cache false sharing

什么是 Cache False Sharing? 参考 Architecture of Modern CPU 的 Exercise 一节。

int *a = new int[1024]; 
void worker(int idx)
{for (int j = 0; j < 1e9; j++)a[idx] = a[idx] + 1;
}

考虑以下程序:

  • P1: 开启 2 线程,执行 worker(0), worker(1)
  • P2: 开启 2 线程,执行 worker(0), worker(16)

P2 的执行速度会比 P1 快,现代 CPU 的 Cache Line 大小一般为 64 字节,由于 a[0], a[1] 位于同一个 CPU Core 的同一个 Cache Line,每次写入都会带来数据竞争 (Data Race) ,触发缓存和内存的同步(参考 MESI 协议),而 a[0], a[16] 之间相差了 64 字节,不在同一个 Cache Line,所以避免了这个问题。

所以,对于上述的 SPSC Queue,可以进行以下改进:

template <class T>
class spsc_queue
{
private:std::vector<T> m_buffer;alignas(64) std::atomic<size_t> m_head;alignas(64) std::atomic<size_t> m_tail;
};

这里的 alignas(64) 实际上改为 std::hardware_constructive_interference_size 更加合理,因为 Cache Line 的大小取决于具体 CPU 硬件的实现,并不总是为 64 字节。

#ifdef __cpp_lib_hardware_interference_size
using std::hardware_constructive_interference_size;
using std::hardware_destructive_interference_size;
#else
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
constexpr std::size_t hardware_constructive_interference_size = 64;
constexpr std::size_t hardware_destructive_interference_size = 64;
#endif

Benchmark 结果:

Mean:   38,993,940.400000 elements/s 
Median: 39,027,123.000000 elements/s 
Max:    39,253,946 elements/s 
Min:    38,624,197 elements/s 

3. Remove useless memory access

在使用 spsc_queue 的时候,通常会有以下形式的代码:

spsc_queue sq(1024);
// Producer keep spinning
int x = 233;
while (!sq.enqueue(x)) {}

而在 dequeue/enqueue 中,存在判空/判满的代码:

inline bool enqueue(const T &item)
{const size_t tail = m_tail.load(std::memory_order_relaxed);const size_t next = (tail + 1) % m_buffer.size();if (next == m_head.load(std::memory_order_acquire))return false;// ...
}

每次执行 m_head.load,Producer 线程的 CPU 都会访问一次 m_head 所在的内存,但实际上触发该条件的概率较小(因为在实际的场景下, Producer/Consumer 都是计算密集型,否则根本不需要无锁的数据结构)。在判空/判满的时候,可以去 “离 CPU 更近” 的 Cache 去获取 m_head 的值。

template <class T>
class spsc_queue
{
private:std::vector<T> m_buffer;alignas(hardware_constructive_interference_size) std::atomic<size_t> m_head;alignas(hardware_constructive_interference_size) std::atomic<size_t> m_tail;alignas(hardware_constructive_interference_size) size_t cached_head;alignas(hardware_constructive_interference_size) size_t cached_tail;
};inline bool enqueue(const T &item)
{const size_t tail = m_tail.load(std::memory_order_relaxed);const size_t next = (tail + 1) % m_buffer.size();if (next == cached_head){cached_head = m_head.load(std::memory_order_acquire);if (next == cached_head)return false;}// ...
}

Benchmark 结果:

Mean:   79,740,671.300000 elements/s 
Median: 79,838,314.000000 elements/s 
Max:    80,044,793 elements/s 
Min:    79,241,180 elements/s 

4. Summary

Github: GitHub - sinkinben/lock-free-queue: Lock free spsc-queue (single producer and single consumer).

3 个版本的 spsc_queue 的吞吐量比较(均值,中位数,最大值,最小值)。在优化 Cache False Sharing 和优先从 Cache 读取 head, tail 之后,可得到 x2 的提升。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/329757.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

windows安装nvm以及nvm常用命令

目录 1.什么是nvm以及为啥要用nvm 1.什么是nvm 2.为什么要用nvm 2.安装nvm 1. 下载 2. 安装 1.双击解压后的文件,nvm-setup.exe 2.同意 3.安装路径 4.下一步&#xff0c;这里有建议改成自己的文件夹&#xff0c;这个是用来存储通过nvm切换node后版本的存储路径 5.安装…

MongoDB复制集原理

复制集高可用 复制集选举 MongoDB 的复制集选举使用 Raft 算法&#xff08;https://raft.github.io/&#xff09;来实现&#xff0c;选举成功的必要条件是大多数投票节点存活。在具体的实现中&#xff0c;MongoDB 对 raft 协议添加了一些自己的扩展&#xff0c;这包括&#x…

YOLOv5改进 | 2023注意力篇 | MSDA多尺度空洞注意力(附多位置添加教程)

一、本文介绍 本文给大家带来的改进机制是MSDA(多尺度空洞注意力)发表于今年的中科院一区(算是国内计算机领域的最高期刊了),其全称是"DilateFormer: Multi-Scale Dilated Transformer for Visual Recognition"。MSDA的主要思想是通过线性投影得到特征图X的相应查…

实现在一个文件夹中找到特定名称特点格式的文件

当你要在一个文件夹中查找特定名称和格式的文件时&#xff0c;你可以使用 Python 的 os 和 fnmatch 模块。以下是一个简单的脚本示例&#xff0c;它可以在指定目录中查找文件&#xff1a; import os import fnmatchdef find_files(directory, pattern):"""在指…

c# 学习笔记 - 枚举

文章目录 1. 枚举1.1 枚举结构梳理1.2 枚举完整代码1.3 枚举知识点补充 2. 迭代两种命名空间接口3. yield语句 1. 枚举 1.1 枚举结构梳理 结构图   上图内容可能依旧不通俗易懂&#xff0c;这里使用最简明的话语告诉大家实现方式. foreach语句就是集合的遍历操作&#xff0c…

车辆运动学方程推导和代码实现

文章目录 1. 运动学方程2. 模型实现 1. 运动学方程 自行车模型&#xff08;Bicycle Model&#xff09;是车辆数字化模型中最常见的一种运动学模型。其除了可以反映车辆的一些基础特性外&#xff0c;更重要的是简单易用。通常情况下我们会把车辆模型简化为二自由度的自行车模型…

源码编译部署篇(二)源码编译milvus成功后如何启动standalone并调试成功!

Milvus启动和调试 0 前言1 Milvus启动【问题描述】出现Aborted问题【问题分析】【解决方法】安装Pulsar服务执行单机启动命令解决监听端口号 2 Milvus调试编写launch.json验证单例调试成功 3 遇到的问题汇总问题1问题2:Permission denied 0 前言 由于Milvus官方文档只提及如何…

Windows:笔记本电脑设置休眠教程

前言 不知道大家在使用【Windows】笔记本有没有这个习惯&#xff0c;我会把他的电池选项的【休眠】设置进行打开。因为作为我们开发人员电脑一般是一周关一次机&#xff0c;有时候一个月关一次机。这时候【休眠】功能就给我们提供了一个好处&#xff0c;我们选择了【休眠】后电…

web期末作业网页设计——JavaScript

目录 一.作品简介 二.网页效果 首页 花语 登录界面 注册界面 三.网页代码 首页 登录界面 注册界面 视频界面 一.作品简介 网站系统文件种类包含&#xff1a;html网页结构文件、css网页样式文件、js网页特效文件、images网页图片文件。 网页作品代码简单&#xff…

书生·浦语大模型实战营 Lesson 1

书生浦语大模型全链路开源体系 书生浦语大模型开源历程 书生浦语大模型系列 从模型到应用 数据 预训练 微调 部署

如何写html邮件 —— 参考主流outook、gmail、qq邮箱渲染邮件过程

文章目录 ⭐前言⭐outlook渲染邮件⭐gmail邮箱渲染邮件⭐qq邮箱渲染邮件 ⭐编写html邮件&#x1f496;table表格的属性&#x1f496;文本&#x1f496;图片&#x1f496;按钮&#x1f496;背景图片 ⭐总结⭐结束 ⭐前言 大家好&#xff0c;我是yma16&#xff0c;本文分享关于 …

Android WiFi 连接

Android WiFi 连接 1、设置中WiFi显示2、WiFi 连接流程2.1 获取PrimaryClientModeManager2.2 ClientModeImpl状态机ConnectableState2.3 ISupplicantStaNetworkCallback 回调监听 3、 简要时序图4、原生低层驱动5、关键日志 1、设置中WiFi显示 Android WiFi基础概览 packages/a…