AtomicHashMap
github 仓库:https://github.com/facebook/folly/tree/main/folly
文档:https://github.com/facebook/folly/blob/main/folly/docs/AtomicHashMap.md
本文只简单介绍下 folly库的 AtomicHashMap 实现思想,详细设计可以直接看源码。
AtomicHashMap 是基于 AtomicHasArray 实现的。
本质上,AtomicHashArray
就是一个静态数组,通过CAS操作实现并发插入、删除操作。关键在于如何封装使得对元素操作原子化。
AtomicHashArray,为 key 提供了三个标志元素 empty_key、locked_key、erased_key;
- 初始化,所有元素的 key = empty_key;
- AtomicHashArray 采用固定长度的设计,不支持扩容;
- 插入 key = target 的元素,
- 通过 CAS(key, empty_key, locked_key) 方式锁定一个空元素;
- 更新完之后,key 的状态从 locked_key 变成新的 target 值;
- key 发生冲突时,使用开放寻址继续探测数组中的下一个位置是否可用(没有使用挂链表的方式);
- 如果数组满了,直接返回失败;
- 更新,
- 如果当前元素的 key 已经等于 target,说明 key 已经存在;
- 删除 key = target 的元素,
- 通过 CAS(key, target, erased_key) 方式锁定要删除的元素;
- 此时 key = erased_key(被标记已删除),但 value 还在,防止其它线程仍在使用 value;
- 那什么时候可以释放 value 呢,并同时把 key 从 erased_key 改成 empty_key(key 可以被复用)?一种方式是提供一个 erase() 方法由使用方决定何时释放,另一种方式是使用类似风险指针的方式来延迟 GC;
插入的代码(简化后的流程),
insertInternal(LookupKeyT key_in, ArgTs&&... vCtorArgs) {size_t idx = keyToAnchorIdx<LookupKeyT, LookupHashFcn>(key_in);for (;;) {value_type* cell = &cells_[idx];KeyT currentKey = acquireLoadKey(*cell);if (currentKey == kEmptyKey_) {// 可以插入新键// CAS 修改成功if (compare_exchange_strong(currentKey, kEmptyKey_, kLockedKey_)) {return SimpleRetT(idx, true);}} if (currentKey == kLockedKey_) {// 等待解锁detail::atomic_hash_spin_wait([&] { return currentKey != kLockedKey_; });} if (LookupEqualFcn()(currentKey, key_in)) {// 插入的 key 已存在,不会覆盖return SimpleRetT(idx, false);} else if (currentKey == kLockedKey_ || currentKey == kEmptyKey_) {// 重试continue;}// 继续探测下一个位置idx = ProbeFcn()(idx, numProbes, capacity_);} }
AtomicHashArray 类包含 3 个原子类型:
std::atomic<int64_t> isFull_; // Used by insertInternal std::atomic<int64_t> numErases_; // Successful key erases std::atomic<KeyT> *cellkeyptr_; // cell中key的指针。
AtomicHashMap 类
std::atomic<SubMap*>subMaps_[kNumSubMaps_]; // 指向AtomicHashArray, kNumSubMaps_ = 16 std::atomic<uint32_t> numMapsAllocated_; // 记录AtomicHashArray数目
AtomicHashMap 包含一个子映射数组(subMaps_),
- 插入 key,
- 先计算使用哪个子映射,然后计算在该子映射的下标,如果该位置可用,直接插入,如果不可用,换一个子映射继续尝试插入;
- 如果所有子映射都不可用,判断当前子映射个数是否超过上限,没有的话,会尝试新建一个子映射;
- 如果子映射个数达到上限,插入失败;
- 查找,需要遍历所有子映射;