Managing memory in lock-free data structures
正如之前我们在上一章,一开始讨论的那样,为了防止 dangling pointer,我们将 5. delete old head 跳过,那这样会造成内存泄露
The basic problem is that you want to free a node, but you can’t do so until you’re
sure there are no other threads that still hold pointers to it.
Reference-counted reclamation machinery
所以一个解决思路就是在每一次 pop()
的时候,使用一个 atomic 的值记录当前有多少个线程正在 pop, 通过判断 atomic 的值,如果发现只有一个线程在进行 pop,那么这个时候就可以很安全地进行删除了,这种实现就叫做 reference-counted,实现的逻辑如下
// Listing 7.4 Reclaiming nodes when no threads are in pop()
// Listing 7.5 The reference-counted reclamation machinerytemplate<typenam T>
class lock_free_stack
{private:std::atomic<unsigned> threads_in_pop;std::atomic<node*> to_be_deleted;static void delete_node(node* nodes) {while(nodes) {node* next = nodes->next;delete nodes;nodes = next;}}void try_reclaim(node* old_head){if (threads_in_pop == 1){node* nodes_to_delete = to_be_deleted.exchange(nullptr); if (!--threads_in_pop) {delete_node(nodes_to_delete); // 把之前积攒的 nodes 删除}else if (nodes_to_delete) // 发现有新的 thread 在进行 pop(),将 node 添加到 to_be_deleted{chain_pending_nodes(nodes_to_delete);}delete old_head; }else {chain_pending_nodes(old_head);--threads_in_pop;}}// 构建 to_be_deletd 链表void chain_pending_nodes(node* nodes){node* last = nodes;while (node* const next = last->next) // follow the next painter chain to the end{last = next;}chain_pending_nodes(nodes, last);}void chain_pending_nodes(node* first, node* last){last->next = to_be_deletd;while (!to_be_deletd.compare_exchange_weak(last->next, first)); // loop to guarantee that last->next is correct}void chain_pending_node(node* n){chain_pending_nodes(n, n);}public:std::shared_ptr<T> pop(){++threads_in_pop;node* old_head = head.load();while (old_head &&!head.compare_exchange_weak(old_head, old_head->next));// 以上的 compare_exchange_weak 能确保只有一个线程能得到这个 old_headstd::shared_ptr<T> res;if (old_head) {res.swap(old_head->data); // swap 不觉得很神奇吗?我觉得我现在是无法掌握的}// 虽然每一个线程得到的 old_head 是不一样的,但是 try_reclain 可能是多线程运行的, 传递不同的 old_headtry_reclaim(old_head);return res;}
}
然后就是在看这个代码的时候
void try_reclaim(node* old_head){if (threads_in_pop == 1){node* nodes_to_delete = to_be_deleted.exchange(nullptr); if (!--threads_in_pop) {delete_node(nodes_to_delete); // 把之前积攒的 nodes 删除}else if (nodes_to_delete) // 发现有新的 thread 在进行 pop(),将 node 添加到 to_be_deleted{chain_pending_nodes(nodes_to_delete);}delete old_head; }else {chain_pending_nodes(old_head);--threads_in_pop;}}
我当时有一些疑问
-
为什么对于
delete old_head
还需要再判断一次if (threads_in_pop == 1)
对于说能够来到此方法那么已经是已经经过了while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
只有一个线程能拿到 old_head 来到这里- 这是因为在 1-4 这个阶段其实还可能会有其他线程在 1 拿到了 old_head 只是它卡在了 4 而已,所以必须要通过
if (threads_in_pop == 1)
判断没有其他线程拿着 old_head 才可以删除
- 这是因为在 1-4 这个阶段其实还可能会有其他线程在 1 拿到了 old_head 只是它卡在了 4 而已,所以必须要通过
-
为什么对于
delete_node(nodes_to_delete);
他还要再次地判断if (!--threads_in_pop)
成功时,而不能像删除 old_head 一样在if (threads_in_pop == 1)
后直接就删除?- 这是因为,虽然在进入原子化交换 to_be_deleted 和 nodes_to_delete 前能确定当前只有一个线程,而且在
if (threads_in_pop == 1)
的时候我们能保证在to_be_delete
内部的 node 都没有指针指向可以删除;但是等到真正交换的时候,可能在这段时间一个新的线程 pop, 并且将它获得的 old_head 放入了 to_be_deleted, 这个时候我们没有办法确定对于这个新线程加入的 old_head 是否会有其他的线程拿这这个 old_head 的指针在 1-4 部分使用,因此,我们必须要再次通过if (!--threads_in_pop)
确认确实没有其他线程时才可以进行删除
- 这是因为,虽然在进入原子化交换 to_be_deleted 和 nodes_to_delete 前能确定当前只有一个线程,而且在
Hazard pointers
相比较与 reference-counted 记录全局的 threads 的数量来判断说是否可以删除元素, hazard pointers 记录了当前被 referenced 的资源,通过记录每一个资源有谁指向它,来判断是否可以删除一个元素,记录的颗粒度更细一点.
类似的,在 hazard_pointers 的实现上,也有一个和 reference-counted 中相似的 to_be_deleted
的 atomic 链表,在这里为 std::atomic<data_to_reclaim*> nodes_to_reclaim
.
在对应 hazard_pointers 记录的存储上
This location must be visible to all threads, and you need one of these for each thread that might access the data structure.
一个简单的实现是一个固定长度的存储 pair<id, pointer>
的数组
// Listing 7.6 An implementation of pop() using hazard pointers
std::shared_ptr<T> pop() {std::atomic<void*>& hp = get_hazard_pointer_for_current_thread();node* old_head = head.load();do {node* tmp;do { // loop until you've set the hazard pointer to headtmp = old_head;hp.store(old_head);old_head = head.load();} while (old_head != tmp);} while (old_head && !head.compare_excange_strong(old_head, old_head->next));hp.store(nullptr); // clear hazard pointer once you've finishedstd::shared_ptr<T> res;if (old_head) {res.swap(old_head->data);// chech for hazard pointers referencing a node before you delete itif (outstanding_hazard_piointers_for(old_head)) {reclain_later(old_head);} else {delete old_head;}delete_nodes_with_no_hazards();}return res;
}// Listing 7.7 A simple implementation of get_hazard_pointer_for_current_thread()
unsigned const max_hazard_pointers = 100;
struct hazard_pointer {std::atomic<std::thread::id> id;std::atomic<void*> pointer;
};
hazard_pointer hazard_pointers[max_hazard_pointers];
class hp_owner {hazard_pointer* hp;public:hp_owner(hp_owner const&) = delete;hp_owner operator=(hp_owner const&) = delete;hp_owner() : hp(nullptr) {for (unsigned i = 0; i < max_hazard_pointers; ++i) {std::thread::id old_id;// try to claim ownership of a hazard pointerif (hazard_pointers[i].id.compare_excange_strong(old_id, std::this_thread::get_id())) {hp = &hazard_pointers[i];break;}}if (!hp) {throw std::runtime_error("No hazard pointers available");}}std::atomic<void*>& get_pointer() { return hp->pointer; }~hp_owner() {hp->pointer.store(nullptr);hp->id.store(std::thread::id());}
};std::atomic<void*>& get_hazard_pointer_for_current_thread() {thread_local static hp_owner hazard; // each thread has its own hazard pointerreturn hazard.get_pointer();
}bool outstanding_hazard_pointers_for(void* p) {for (unsigned i = 0; i < max_hazard_pointers; ++i) {if (hazard_pointers[i].pointer.load() == p) {return true;}}return false;
}// Listing 7.8 A Simple implementation of the reclaim functions
template <typename T>
void do_delete(void* p) {delete static_cast<T*>(p);
}
struct data_to_reclaim {void* data;std::function<void(void*)> deleter;data_to_reclaim* next;template <typename T>data_to_reclaim(T* p) : data(p).deleter(&do_delete<T>),next(0) {}!data_to_reclaim() { deleter(data); }
};std::atomic<data_to_reclaim*> nodes_to_reclaim;
void add_to_reclaim_list(data_to_reclaim* node) {node->next = nodes_to_reclaim.load();while (!nodes_to_reclaim.compare_exchange_weak(node->next, node));
}template <typename T>
void reclaim_later(T* data) {add_to_reclaim_list(new data_to_reclaim(data));
}void delete_nodes_with_no_hazards() {data_to_reclaim* current = nodes_to_reclaim.exchange(nullptr);while (current) {data_to_reclaim* const next = current->next;if (!outstanding_hazard_pointers_for(current->data)) {delete current;} else {add_to_reclaim_list(current);}current = next;}
}
其实整体的逻辑是很直观的
- 申请 hazard_pointer 这部分在
hp_owner()
构造函数中实现,他遍历整个 hazard_pointers 数组,然后使用 atomic compare/exchange 部分通过设置对应元素的 id 为自己的 id 将其空间设定为自己的 - 在 pop 时,通过检查
outstanding_hazard_pointers_for(void *p)
来判断是否有指向 p 的 reference,如果有,对应元素不能立即被删除,而是应该调用reclaim_later()
将对应资源放入到std::atomic<data_to_reclaim*> nodes_to_reclaim;
delete_nodes_with_no_hazards()
就是删除对应在nodes_to_reclaim
中没有其他线程 ref 此资源的方法,他就是遍历nodes_to_reclaim
中的所有元素,然后每一个元素都调用outstanding_hazard_pointers_for()
检查是否此资源有其他 ref,如果没有就可以删除
重点
在看这里的时候,有 2 个点需要注意
- 在获得 head 并且 hp.store(head) 这中间,可能会出现 rece condistion,所以这部分一定要使用 while loop
do {node* tmp;do { // loop until you've set the hazard pointer to headtmp = old_head;hp.store(old_head);old_head = head.load();} while (old_head != tmp);} while (old_head && !head.compare_excange_strong(old_head, old_head->next));
并且这里使用 compare_excange_strong
是因为虽然 compare_excange_week
性能会好一点,但是他会有虚假错位,造成 hp.store()
不必要的重试,因此,在这里使用 compare_excange_strong
- 第二个是一个问题,就是
outstanding_hazard_pointers_for()
当时我的问题是,他是一个遍历的操作,遍历所有的 hazard 数组然后判断是否内部有 thread 指向此资源,我就在想,如果说如果在遍历的过程中,前面的某一个 thread 原本并没有引用这个资源,但是在 outstanding 遍历到后面后它引用了这个资源,那这个时候删除不是会出现问题吗?
答案是,这个问题并不会存在,因为在 pop()
中,实际上可能的 ref 情况是多个线程获得了此 head,但是只会有一个线程真正成功地 pop 出这个元素
我们所要等的指向让对应资源的 thread 就是那些还困在 1,3 部分的 thread,而在真正 pop() 后,对应的元素放入到了 nodes_to_reclaim 之后,是不会还有新的线程指向这个元素
因此,如果在 outstanding_hazard_pointers_for()
检查到了一个资源被引用的数量为 0 之后,他是不会不会再被增加了