在 03. 无锁栈的内存管理 我们讨论了两种检测是否节点可以被删除的方法:reference count 和 hazard pointers 方法,但是事实上是它们的管理方式比较复杂,需要考虑的比较多,而说到生命周期的管理,我们很自然而然地会想到类似 std::shared_ptr<>
的使用,所以在这里我们尝试使用一些方法简化无数数据结构的内存管理
使用 lock-free std::shared_ptr<>
自动管理
// Listing 7.9 A lock-free stack using lock-free std::shared_ptr
template <typename T>
class lock_free_stack {private:struct node {// data 使用 shared_ptr 是为了异常返回安全// 在 push 的时候就构造出来,然后在 pop() 时就直接返回一个 shared_ptr// 因为返回一个 shared_ptr 是不会抛出异常的std::shared_ptr<T> data;// 使用 shared_ptr 自动管理 stack 的 node 的生命周期// 避免使用裸指针std::shared_ptr<node> next;node(T const& data_) : data(std::make_shared<T>(data_)) {}};// 使用 shared_ptr 自动管理 stack 的 node 的生命周期// 避免使用裸指针std::shared_ptr<node> head;public:// 实现 push 的逻辑和之前的一致,基本的 3 步走// 只是 node 使用 shared_ptr 进行包装void push(T const& data) {std::shared_ptr<node> const new_node = std::make_shared<node>(data);new_node->next = std::atomic_load(&head);while (!std::atomic_compare_exchange_weak(&head, &new_node->next, new_node));}std::shared_ptr<T> pop() {std::shared_ptr<node> old_head = std::atomic_load(&head);while (old_head &&!std::atomic_compare_exchange_weak(&head, &old_head, std::atomic_load(&old_head->next)));if (old_head) {std::atomic_store(&old_head->next, std::shared_ptr<node>());return old_head->data;}return std::shared_ptr<T>();}~lock_free_stack() { while (pop()); }
};
这段代码其实重点就是在 pop()
部分,其实和之前的差不多,就是对应逻辑的 4 步走;但是值得注意的部分是
if (old_head) {std::atomic_store(&old_head->next, std::shared_ptr<node>());return old_head->data;
}
当取出了对应的节点后,要将对应节点的 next 设置为 nullptr,这点怎么理解呢?在原文中有写到
Note the need to clear the next pointer from the popped node in order to avoid the potential for deeply nested destruction of nodes when the last std::shared_ptr referencing a given node is destroyed.
考虑以下场景
A -> B -> C -> ...
A 引用 B,B 运用 C ... 在这个时候,如果 A 被 pop 出来,在析构 A 的时候,会调用对应 shared_ptr<B>
的析构,因为 A 保存着对 B 的唯一引用,所以这个时候连带着 B 也会被析构,而同理,后续的 C 等等也都会被调用析构函数,这样的链式引用可能会造成说调用析构函数次数太多,栈溢出。
好的,我们理解了这种链式引用的问题,但是你还是觉得不对劲,你认为在当前的场景下,如果 A 被 pop 出,A 对应的对 B 的引用会被 -1, 但是按照逻辑,这个时候 head 会指向 B,所以即使 A 析构的也不应该会造成这样链式引用多次析构的问题...
但事实是,因为这是一个多并发的场景,其实你并不能保证 head 对 B 的引用是长期有效的,即使说可能在之后另一个线程就将 B pop 出了,所以在一定程度上还是会有这样的问题潜在存在,因此显式地将 next 设置为 nullptr 切断对应的链式连接是必要的,也可以看做是防御性编程。
使用 std::experimental::atomic_shared_ptr<>
上面的使用 std::shared_ptr<>
在实现上确实变得清晰简洁,但事实上事情并不会那么轻松,因为以上的实现是需要依赖于 shared_ptr
的原子操作是 lock-free 的才能说是实现了一个 lock-free 的 stack,而对于 std::atomic<>
说,如果使用 shared_ptr
作为类型,那么他很有可能会退化成为使用锁来实现。
这是因为 std::atomic<T>
只能用于那些 复制构造和赋值操作是 "trivial" 或者支持原子性的 类型。
在 C++ 里,一个类型如果它的复制操作(copy constructor / copy assignment)是编译器默认生成的、没有做复杂的事情,比如没有分配内存、没有引用计数这种“副作用”,那就叫 trivial copy。
但std::shared_ptr<T>
的复制操作不是这样简单的:
每次你复制一个shared_ptr
,它都会 修改引用计数(reference count)来记录有多少指针指向同一个对象。这个行为是必须受保护的(通常需要加锁或原子操作),否则在多线程下就会崩。
shared_ptr<T>
的复制不是平凡的,它需要管理引用计数,所以 直接用在 std::atomic<shared_ptr<T>>
会出错或不安全(很多编译器甚至直接禁止这么写)。
因此,C++ 的 Concurrency TS(技术规范)里引入了一个新类型:
std::experimental::atomic_shared_ptr<T>
,它是专门为 shared_ptr 提供 原子操作封装 的工具。你不需要手动调用 atomic_load() 或 atomic_store(),这个类内部已经处理好了引用计数的线程安全问题。如果你的平台支持 Concurrency TS,那你可以使用它来简洁而高效地实现
// Listing 7.10 Stack implementation using std::experimental::atomic_shared_ptr<>
template <typename T>
class lock_free_stack {private:struct node {std::shared_ptr<T> data;std::experimental::atomic_shared_ptr<node> next;node(T const& data_) : data(std::make_shared<T>(data_)) {}};std::experimental::atomic_shared_ptr<node> head;public:void push(T const& data) {std::shared_ptr<node> const new_node = std::make_shared<node>(data);new_node->next = head.load();while (!head.compare_exchange_weak(new_node->next, new_node));}std::shared_ptr<T> pop() {std::shared_ptr<node> old_head = head.load();while (old_head && !head.compare_exchange_weak(old_head, old_head->next.load()));if (old_head) {old_head->next = std::shared_ptr<node>();return old_head->data;}return std::shared_ptr<T>();}~lock_free_stack() { while (pop()); }
};
然后,在文章中还提到了使用 split reference counts 来实现可以参考文中 Listing 7.12 + Listing 7.13 的实现