nonblocking 的3种类型
事实上 nonblocking != lock-free, 将 nonblocking 分为 3 个类型
- Obstruction-Free — If all other threads are paused, then any given thread will complete its operation in a bounded number of steps.
- Lock-Free — If multiple threads are operating on a data structure, then after a bounded number of steps one of them will complete its operation.
- Wait-Free — Every thread operating on a data structure will complete its opera tion in a bounded number of steps, even if other threads are also operating on the data structure.
简单分级来说就是 Obstruction-Free 是一种弱实现,在所有线程都暂停的情况下,指定的某一个线程会在有限的步骤下完成;Lock-Free 是说在多个线程操作同一个数据结构的时候,能保证有一个线程可以在有限的步骤下完成;Wait-Free 是一种更严格的 Lock-Free,因为他要求在多线程操作下,所有线程都能在有限的步骤后完成
Lock-free algorithms with these loops can result in one thread being subject to starvation. If another thread performs operations with the “wrong” timing, the other thread might make progress but the first thread continually has to retry its operation.
这里值得注意的是所有要求实现的前提是在 a bounded number of steps 的情况下,如果完成需要花费的步骤非常非常大也不会算是 Wait-Free,这也就是为什么 Wait-Free 很难实现的原因了,因为如果要所有的线程都能在有限的步骤后实现,那么就是要求线程之间不会因为彼此的操作而产生影响,不会因为另一个线程的行为而产生类似重试的操作,这种能够避免 starvation 的就是 Wait-Free. 这是非常困难的
Lock-Free 的好处与坏处
基本上使用 Lock-Free 数据结构是为了达到 2 种好处
- enable maximum concurrency - 为了获得最大的并发量
- robustness - 为了鲁棒性
因为是 Lock-Free 所以 some threads 在每一步都可以获得一些进展,而对于鲁棒性是指当使用 lock 的时候,如果一个线程在获得锁的时候出现了问题而没有释放锁,那么整个数据结构 is broken forever. 而如果是 Lock-Free 的情况下,其中一个线程出现了问题并不会产生像锁那样的影响
但是同时,Lock-Free 也会带来很多问题,因为实现 Lock-Free 依赖 atomic + memeory order 使其变得 visible,因为不依赖锁,所以对应的实现逻辑也会变得更复杂,对 atomic 的操作也会更多,对应的内存屏障,可见性的问题等等也都会带来很多的性能消耗,因此在某些场景下,Lock-Free 会比锁带来更多的消耗,所以不能将 Lock-Free 直接和高效划等号,只有在某些特定的场景下才合适
另外,虽然不会发生 deadlocks 但是 live-lock is possible,一个直观的 live-lock 的理解是一个只能允许一个人通过的通道,一个想要从左边到右边,一个想要从右边到左边,并且碰巧在中间碰到的情况下,双方都会重新回到原点重新尝试,而在重新尝试的过程中,依旧双方还是可能会被卡住,从而不断重试的场景。但是同样 live-lock 是 short-lived 因为她是依赖于特定的调度情况
Design lock free stack
当我们实际一个 Lock-Free stack 的时候,我们要确保的一个点是:一旦一个值被添加到了 stack,它就应该能够立即被另外一个线程安全地获取到,且只有一个线程能够获取
最简单的 Lock-Free stack 的实现就是使用链表结构,然后 head 指向栈顶元素,元素之间的顺序使用指针维护
Push() without lock
对于添加一个元素的正常逻辑一般是 3 个步骤
- 创建一个 Node 对象
- 将 Node 对象的 next 指针设置成为 head->next
- 将 head 指向创建的 Node 对象
这样的逻辑在单线程的情况下是OK的,但是在多线程的情况下,如果在线程 a 在执行 2,3 的中间有另外一个线程 b 成功添加了一个元素,这个时候如果线程 a 再修改 head 指针,那么对应 b 添加的元素久不在了...
因此,避免这种问题的方法就是在执行 3 的时候使用 atomic compare/exchange opertion, 只有当确保 head 没有被修改的情况下才能更改 head
template<typenam T>
class lock_free_stack
{private:struct node {T data,node* next;node(T const& data_) : data(data_){}};std::atomic<node*> head;public:void push(T const*& data){node* const new_node = new node(data);new_node->next = head.load();while (!head.compare_exchange_weak(new_node->next, new_node));}
}
同样,这里还需要注意的一点是当创建的 Node 成为 head 之前,一定要保证说 Node 的创建是完成的,因为只有这样才能满足说,一旦一个值被添加到了 stack,它就应该能够立即被另外一个线程安全地获取到
然后一个细节是当运行!head.compare_exchange_weak(new_node->next, new_node)
部分的时候,new_node->next
是会被更新成为最新的值,所以之后是不用 reload 的
So, you might not have a pop() operation yet, but you can quickly check push() for compliance with the guidelines. The only place that can throw an exception is the construction of the new node B, but this will clean up after itself, and the list hasn’t been modified yet, so that’s perfectly safe. Because you build the data to be stored as part of the node, and you use compare_exchange_weak() to update the head pointer, there are no problematic race conditions here. Once the compare/exchange succeeds, the node is on the list and ready for the taking. There are no locks, so there’s no possibility of deadlock, and your push() function passes with flying colors.
这段话我觉得也蛮有意思的,检验 Lock-Free 的实现,它讨论了
- exception-safe
- race conditions
- deadlock
Pop() that leaks nodes
对于取出一个元素的正常逻辑一般是 5 个步骤
- 获取 head 的值
- 获取 head->next
- head 指向 head->next
- 将第一步获得到的 Node 的值返回
- 删除第一步获取到的 head
首先讨论的第一个问题是,因为是无锁的,有可能出现有多个指针在第一步的时候获取到相同的 head,而如果这时候有一个线程运行到 5 将其 delete,那么其他线程就会出现 dereferencing a dangling pointer 的问题,因此我们先不删除,先没有第 5 步 (CppCon 2017: Fedor Pikus “Read, Copy, Update, then what? RCU for non-kernel programmers” 好像确实,就像是在这里讨论RCU,说很多问题都是在 delete 的时候出现)
然后同样,就像是 Push()
部分一样,也是使用 atomic-compare/exchange 的方式来修改 head,这样不仅解决了和 push 相似的内容,也可以保证一个值只能被一个线程能够返回,因为在第 3 步,使用 atomic-compare/exchange 实现的话,只有一个能够成功,也因此也只有一个线程能进入到第 4 步,然后返回对应的值,不会出现多个 thead 从 head 中获得相同的 data 并返回
// listing 7.3 A lock-free stack that leaks nodes
#include <atomic>template<typenam T>
class lock_free_stack
{private:struct node{std::shared_ptr<T> data;node* next;node (T const& data_) : data(std::make_shared<T>(data_)){}};std::atomic<node*> head;public:void push(T const& data){node* const new_node = new node(data);new_node->next = head.load();while (!head.compare_exchange_weak(new_node->next, new_node));}std::shared_ptr<T> pop(){node* old_head = head.load();while (old_head &&!head.compare_exchange_weak(old_head, old_head->next));return old_head ? old_head->data : stad::shared_ptr<T>();}
}
Exception-safety issue
这部分主要讲了在多线程数据结构下"如何安全的值"的问题,尤其是在可能抛异常的情况下
整个问题是这样的:
- 如果通过"返回值"的方式返回对象,那么在拷贝这个值的过程中如果抛出了异常,返回值就丢了
std::optional<Data> pop() {std::lock_guard<std::mutex> lock(mutex);if (stack.empty()) return std::nullopt;Data value = stack.top(); // 1stack.pop(); // ← 已经把值移除!return value; // ← 拷贝在这里可能抛异常
}
如果是在 1 抛出异常的情况下,值不会丢,但是一般是在 return 拷贝的时候发生异常,这就有问题了
- 如果说将结构使用引用参数穿出去(不是 return ),这样如果在拷贝的过程中抛出异常,那么对象还没有被栈弹出,所以栈的状态还没有变,值没丢
bool pop(Data& out) {std::lock_guard<std::mutex> lock(mutex);if (stack.empty()) return false;out = stack.top(); // ← 如果这里抛异常,程序终止,没有进行 pop()stack.pop(); // ← 值还是在栈里return true;
}
- 而在 lock-free 的情况下,你是没有办法的,因为你必须要在确定你是唯一拿到这个值之后你才可以返回,而本身"确定你是唯一拿到这个值"这件事情,是使用 atomic-compare/exchange 的方式,这个时候对应的 head 以及被设置了,即这个值已经被 pop 出了,而此时如果在复制的时候出现异常,这个值就丢了
结论:在这样的情况下,传引用已经没有了他的效果,所以还不如返回值
If you want to return the value safely, you have to use the other option
from chapter 3: return a (smart) pointer to the data value.
If you return a smart pointer, you can return nullptr to indicate that there’s no
value to return, but this requires that the data be allocated on the heap. If you do the
heap allocation as part of pop(), you’re still no better off, because the heap allocation
might throw an exception. Instead, you can allocate the memory when you push() the
data onto the stack—you have to allocate memory for the node anyway. Returning
std::shared_ptr<> won’t throw an exception, so pop() is now safe. Putting all this
together gives the following listing.
这里就是说,如果你真的想要 return value safely, 可以返回一个智能指针
template<typename T>
class lock_free_stack
{
private:struct node{std::shared_ptr<T> data; node* next;node(T const& data_): data(std::make_shared<T>(data_)) {}};std::atomic<node*> head;
public:void push(T const& data){node* const new_node=new node(data);new_node->next=head.load();while(!head.compare_exchange_weak(new_node->next,new_node));}std::shared_ptr<T> pop(){node* old_head=head.load();while(old_head && !head.compare_exchange_weak(old_head,old_head->next));return old_head ? old_head->data : std::shared_ptr<T>(); }
};
这里有几个点,就是我能理解说就是要将对应的value存在heap,而且在push中去allocation是好的,因为在 push 中如果new node
失败也没有关系,因为并没有影响heap,但是在看对应代码的实现的时候我发现他把 node
的 T*
改成了 shared_ptr<T>
, 在 push()
的时候创建 shared_ptr
。这我就不太能理解了,就是说只要data是在heap应该应该就可以了,那原本 node*
也是在heap的啊,那为什么不是在 pop 的时候再创建对应的 shared_ptr ?
- 是因为考虑
make_shared
失败的情况吗 - 另外一个原因我能想到的就是
shared_ptr
如果多次创建那么就是引用块也不一样了 - 然后就是利用
shared_ptr
的复制是 noexcept 的:它只是增加引用计数,并不会分配内存或做别的可能抛异常的事情
Managing memory in lock-free data structures
正如之前我们一开始讨论的那样,为了防止 dangling pointer,我们将 5. delete old head 跳过,那这样会造成内存泄露
The basic problem is that you want to free a node, but you can’t do so until you’re
sure there are no other threads that still hold pointers to it.
所以一个解决思路就是在每一次 pop()
的时候,使用一个 atomic 的值记录当前有多少个线程正在 pop, 通过判断 atomic 的值,如果发现只有一个线程在进行 pop,那么这个时候就可以很安全地进行删除了,实现的逻辑如下
// Listing 7.4 Reclaiming nodes when no threads are in pop()
// Listing 7.5 The reference-counted reclamation machinerytemplate<typenam T>
class lock_free_stack
{private:std::atomic<unsigned> threads_in_pop;std::atomic<node*> to_be_deleted;static void delete_node(node* nodes) {while(nodes) {node* next = nodes->next;delete nodes;nodes = next;}}void try_reclaim(node* old_head){if (threads_in_pop == 1){node* nodes_to_delete = to_be_deleted.exchange(nullptr); if (!--threads_in_pop) {delete_node(nodes_to_delete); // 把之前积攒的 nodes 删除}else if (nodes_to_delete) // 发现有新的 thread 在进行 pop(),将 node 添加到 to_be_deleted{chain_pending_nodes(nodes_to_delete);}delete old_head; }else {chain_pending_nodes(old_head);--threads_in_pop;}}// 构建 to_be_deletd 链表void chain_pending_nodes(node* nodes){node* last = nodes;while (node* const next = last->next) // follow the next painter chain to the end{last = next;}chain_pending_nodes(nodes, last);}void chain_pending_nodes(node* first, node* last){last->next = to_be_deletd;while (!to_be_deletd.compare_exchange_weak(last->next, first)); // loop to guarantee that last->next is correct}void chain_pending_node(node* n){chain_pending_nodes(n, n);}public:std::shared_ptr<T> pop(){++threads_in_pop;node* old_head = head.load();while (old_head &&!head.compare_exchange_weak(old_head, old_head->next));// 以上的 compare_exchange_weak 能确保只有一个线程能得到这个 old_headstd::shared_ptr<T> res;if (old_head) {res.swap(old_head->data); // swap 不觉得很神奇吗?我觉得我现在是无法掌握的}// 虽然每一个线程得到的 old_head 是不一样的,但是 try_reclain 可能是多线程运行的, 传递不同的 old_headtry_reclaim(old_head);return res;}
}
然后就是在看这个代码的时候
void try_reclaim(node* old_head){if (threads_in_pop == 1){node* nodes_to_delete = to_be_deleted.exchange(nullptr); if (!--threads_in_pop) {delete_node(nodes_to_delete); // 把之前积攒的 nodes 删除}else if (nodes_to_delete) // 发现有新的 thread 在进行 pop(),将 node 添加到 to_be_deleted{chain_pending_nodes(nodes_to_delete);}delete old_head; }else {chain_pending_nodes(old_head);--threads_in_pop;}}
我当时有一些疑问
-
为什么对于
delete old_head
还需要再判断一次if (threads_in_pop == 1)
对于说能够来到此方法那么已经是已经经过了while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
只有一个线程能拿到 old_head 来到这里- 这是因为在 1-4 这个阶段其实还可能会有其他线程在 1 拿到了 old_head 只是它卡在了 4 而已,所以必须要通过
if (threads_in_pop == 1)
判断没有其他线程拿着 old_head 才可以删除
- 这是因为在 1-4 这个阶段其实还可能会有其他线程在 1 拿到了 old_head 只是它卡在了 4 而已,所以必须要通过
-
为什么对于
delete_node(nodes_to_delete);
他还要再次地判断if (!--threads_in_pop)
成功时,而不能像删除 old_head 一样在if (threads_in_pop == 1)
后直接就删除?- 这是因为,虽然在进入原子化交换 to_be_deleted 和 nodes_to_delete 前能确定当前只有一个线程,而且在
if (threads_in_pop == 1)
的时候我们能保证在to_be_delete
内部的 node 都没有指针指向可以删除;但是等到真正交换的时候,可能在这段时间一个新的线程 pop, 并且将它获得的 old_head 放入了 to_be_deleted, 这个时候我们没有办法确定对于这个新线程加入的 old_head 是否会有其他的线程拿这这个 old_head 的指针在 1-4 部分使用,因此,我们必须要再次通过if (!--threads_in_pop)
确认确实没有其他线程时才可以进行删除
- 这是因为,虽然在进入原子化交换 to_be_deleted 和 nodes_to_delete 前能确定当前只有一个线程,而且在