高并发内存池
- 【项目】高并发内存池
- 项目介绍
- 这个项目做的是什么?
- 内存池相关知识
- 池化技术
- 内存池
- malloc
- 定长内存池的实现
- 高并发内存池整体框架设计
- ThreadCache
- 对齐规则
- 封装FreeList类
- 封装thread cache类
- TLS无锁访问
- CenctralCache
- 整体设计
- 页号规定
- span结构
- SpanList结构
- 封装central cache类
- PageCache
- 整体设计
- 封装page cache类
- ThreadCache回收内存
- CentralCache回收内存
- PageCache回收内存
- 细节优化
- 当内存大于256KB时的申请问题
- 释放内存
- 定长内存池组件----脱离malloc
- 高并发环境对比malloc测试
- 性能瓶颈分析
- 根据性能瓶颈优化----基数树
- 使用基数树优化代码
- 项目源码
【项目】高并发内存池
项目介绍
这个项目做的是什么?
当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称 Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存 分配相关的函数(malloc、free)。
tcmalloc: TCMalloc (google-perftools) 是用于优化C++写的多线程应用,比glibc 2.3的malloc快。这个模块可以用来让MySQL在高并发下内存占用更加稳定。
项目相关知识:C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等
内存池相关知识
池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。 在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务 器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠 状态。
内存池
通常情况下,程序员习惯直接使 用 new、delete、malloc、free 等API申请分配和释放内存,这样导致的后果是:当程序长时间 运行时,由于所申请内存块的大小不定,频繁使用时会造成大量的内存碎片从而降低程序和操作系统的性能。
内存池(Memory Pool) 是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接 向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操 作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
内存池解决的问题
内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那么什么是内存碎片呢?
内碎片和外碎片
上面我们讲的外碎片问题。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请 需求。
内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。比如我们申请6字节的空间,通过对齐规则,我们申请的是一块8字节的空间,这样还有2字节的空间也被申请了但并没有被使用,这2字节浪费的空间就是内碎片
malloc
C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的, 而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套, linux gcc用的glibc中的ptmalloc。
更多知识可以参考文章:如何设计内存池
定长内存池的实现
作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能。
因此我们可以模仿malloc的做法,对特定的场景做出特定的设计,设计一个定长内存池,为我们了解内存池机制的同时也为后面高并发内存池项目的实现提供一个重要的组件,使我们的项目脱离malloc。
向堆直接申请内存
向堆直接申请空间有以下方法:
Windows下:VirtualAlloc
Linux下: brk和mmap
我们规定,每次直接向堆申请内存时,以页为单位进行申请,并且规定 2^13=8KB为一页。
#ifdef _WIN32
#include <Windows.h>
#else
//Linux
#endif
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE,PAGE_READWRITE);
#else // linux下brk mmap等
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}
通过条件编译(我们以windows为例)封装向堆申请内存的函数,后面直接调用此函数即可
定长内存池的设计
//定长的实现可以使用非类型模板参数
template<size_t N>
class ObjectPool{};
//也可以通过计算传入类型的大小来实现定长
template<class T>
class ObjectPool
{
private:char* _memory = nullptr;//管理直接向堆申请的大块内存空间size_t _leftBytes = 0;//大块内存空间剩余的字节数void* _freeList = nullptr;//归还时链接的自由链表的头
};
memory用来指向申请的大块内存的起始地址,随着大块内存被切走,memory也要不断更新
_freeList的作用
_freeList用来管理被归还的内存块,用一个链表将归还的内存管理起来,但此链表不需要用STL的链表来链接,而是模仿链表结构,将每次归还回来的内存块的起始地址存放在上一个内存块中,最先归还的就存放在_freeList中,而想要存放地址,则需要取出此内存块的一部分用作指针来指向某块地址
不同平台下存放下一个内存块的地址
在32位平台下和64位平台下,指针的大小是不一样的,因此若编写代码时,我们要让代码能在不同平台下都能够正常运行不会出错
static void*& NextObj(void* obj)
{return *(void**)obj;
}
定长内存池具体实现
#pragma once#include "Common.h"#ifdef _WIN32
#include <Windows.h>
#elif
#endif // !_WIN32//定长内存池,传入大小,适配出对应大小的内存池
//template <size_t N>
//class ObjectPool
//{
//};template<class T>
class ObjectPool
{
public:T* New(){ T* obj = nullptr;//有还回来的内存,优先使用//规定归还链表中,头4(8)内存存放下一个归还内存的头地址if (_freeList){void* next = *(void**)_freeList;obj = (T*)_freeList;_freeList = next;}else{//最开始或者被拿完了再去向系统申请if (_leftBytes < sizeof(T)){_leftBytes = 128 * 1024;_memory = (char*)SystemAlloc(_leftBytes >> 13);if (_memory == nullptr){throw std::bad_alloc();}}obj = (T*)_memory;//_memory += sizeof(T);size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objSize;_leftBytes -= objSize;}//显示调用T的构造函数初始化new(obj)T;return obj;}void Delete(T* obj){//显示调用析构函数清理对象obj->~T();//32和62位的程序指针大小不同//*(int*)obj=nullptr 只对32位下4字节指针有用//头插*(void**)obj = _freeList;_freeList = obj;}
private:char* _memory = nullptr;//定长内存池size_t _leftBytes = 0;//被切分剩余字节数void* _freeList = nullptr;//归还时链接的自由链表的头
};
高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
- 性能问题。
- 多线程环境下,锁竞争问题。
- 内存碎片问题。
高并发内存池整体框架
concurrent memory pool主要由以下3个部分构成:
- thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
- central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache 会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片 的问题。
ThreadCache
定长内存池只支持固定大小的内存的申请与释放,因此只需要用一个自由链表来管理切分固定大小内存块,而我们需要设计一个能够申请各种大小的内存,因此就需要多个固定的定长内存池组成一个非固定的大内存池,因此就需要多个链表。因此将thread cache设计成哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。这也是高并发内存池高效体现之处
对齐规则
thread cache支持小于等于256KB内存大小的申请,但若每个字节数的内存块都用一个自由链表进行管理的话,那这个开销是非常大的,达到256*1024----大约20万个自由链表,于是我们可以通过对齐规则做出一些牺牲,减小开销。
比如说,我们按照8字节对齐,那么当某一个线程申请1-8之间的内存时,就会自动对齐到8字节,并返回8字节大小的内存块给用户使用。9-16之间的字节大小就自动对齐到16,以此类推。这样一来,在对齐规则的影响下, 一定会有部分空间没有被使用到,举个例子,我们申请了4字节空间的内存大小,但被对齐规则对齐到了8字节,但有4字节没有被使用到,造成浪费,这多出来的没有被使用的4字节就是内碎片。
对齐规则的规定
如果只按照8字节内存对齐,那么thread cache下哈希桶的数量也需要达到256 * 1024 / 8 = 32768个桶,开销仍是很大,因此我们起始可以在不同范围内来按照不同的对齐数对齐,这样便可以缩少桶的数量,只需控制好内碎片的浪费率在一定范围内,就不会有很大影响,因此我们规定了一下的对齐方式:
申请字节的大小范围 | 对齐数 | 哈希桶的下标 |
---|---|---|
[1,128] | 8byte对齐 | freelist[0,16) |
[128+1,1024] | 16byte对齐 | freelist[16,72) |
[1024+1,8*1024] | 128byte对齐 | freelist[72,128) |
[8 *1024+1,64 *1024] | 1024byte对齐 | freelist[128,184) |
[64 * 1024+1,256 *1024] | 8*1024byte对齐 | freelist[184,208) |
这样一来,桶的数量就被控制在了208个,对比20万个桶来说,开销是非常小的。
我们再来算一下内碎片的浪费率:
这样就将内碎片控制在了10%左右
有了对齐规则,我们就可以来编写代码了:
对齐和映射
首先,当我们获取到用户申请的内存数后,我们需要计算出对齐后的字节数,再获取映射的哈希桶的下标:我们将诸如计算的规则都封装进一个类中,完成封装:
class SizeClass
{
public://父函数暴露给用户完成大小判断static inline size_t Index(size_t bytes){}//父函数暴露给用户完成大小判断static inline size_t RoundUp(size_t size){}//子函数完成映射static inline size_t _Index(size_t bytes, size_t align_shift){}//子函数完成对齐static inline size_t _RoundUp(size_t bytes, size_t alignNum){}
};
- 设置成静态函数,方便调用
- 设置成内联函数,频繁调用情况下的优化
对齐
class SizeClass
{public://按照对齐规则,不同大小按照不同的对齐数进行对齐static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{return _RoundUp(size, 1 << PAGE_SHIFT);}}
子函数完成对齐的计算:
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
使用位运算提高cpu计算速度:先加上7,即补上不足对齐数的字节,再与7取反后进行与运算,即保留最高位,即对齐数的位,并将其余位变为0,即可得到对齐数
映射
static inline size_t Index(size_t bytes)
{assert(bytes < MAX_BYTES);static int group[4] = { 16,56,56,56 };if (bytes <= 128){return _Index(bytes, 3);//对齐数:8 哈希桶区间:[0,16) 字节区间:[0,128]}else if (bytes <= 1024){return _Index(bytes - 128, 4) + group[0];//对齐数:16 哈希桶区间:[16,72) 字节区间:[128+1,1024]}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group[0] + group[1];//对齐数:128 哈希桶区间:[72,128) 字节区间:[1024+1,8*1024]}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group[2] + group[1]+ group[0];//对齐数:1024 哈希桶区间:[128,184) 字节区间:[8*1024,64*1024]}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group[3] +group[2] + group[1] + group[0];//对齐数:1024*8 哈希桶区间:[184,208) 字节区间:[64*1024,256*1024]}else {assert(false);}
}
子函数完成映射的计算:
static inline size_t _Index(size_t bytes, size_t align_shift)
{return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
桶下标的计算:先判断在哪个对齐区间的范围内,根据这个区间内的对齐数补上对应字节不足对齐数的部分但不会超过下一个范围,再除以对齐数得到次数在此范围内的第几个区间内,由于下标从0开始,因此再减去1即可。
align_shift:是将对齐数转化为2的次方数,目的是为了做位运算,提高cpu速度,比如2的3次方=8,那么align_shift=3
封装FreeList类
对自由链表进行封装,提供插入新内存块与弹出内存块的功能,方便后续对自由链表进行增删
class FreeList
{
public:void Push(void* obj){assert(obj);//头插NextObj(obj) = _freeList;_freeList = obj;}void* Pop(){assert(_freeList);//头删void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}
private:void* _freeList=nullptr;
};
thread cache的底层就是一个自由链表数组,大小为208,即存放208个自由链表的数组。
我们定义全局变量来提高代码可读性。
static const size_t MAX_BYTES = 256 * 1024;//对象大小不超过的大小
static const size_t NFREELISTS = 208;//桶的数量为208个
封装thread cache类
class ThreadCache
{
public://申请内存对象void* Allocate(size_t size);//从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELISTS];
};
申请内存逻辑:
- 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
- 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。_
- 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{//找下一层:center cachereturn FetchFromCentralCache(index, alignSize);}
}
TLS无锁访问
由于我们的目的是将此内存池设计成高并发的状态下高效运行,因此我们的设计是将第一层thread cache设计成在多线程的情况下也能够无锁访问,这样能大大提高效率。
我们需要用到线程局部存储TLS(thread local storage),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
Thread Local Storage(线程局部存储)TLS - 知乎 (zhihu.com)
//每个线程都有自己的threadcache -> TLS thread local storage做到多线程下可以无锁访问
static _declspec(thread)ThreadCache* pTLSThreadCache = nullptr;//当线程调用申请内存接口时先创建自己的thread cache
if (pTLSThreadCache == nullptr)
{pTLSThreadCache = new ThreadCache;
}
CenctralCache
整体设计
central cache与thread cache的联系
当某个线程申请某一块大小的内存时,通过对齐规则找到thread cache下的对应的桶下的自由链表后,若链表不为空,取一个内存块返回即可,若为空,那么则需要像下一层Central cache批量先申请内存。
central cache与thread cache的异同
- 结构一致:central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的(对齐规则一致)。
- 桶结构下的挂的内容不一致:central cache的每个哈希桶位置挂是SpanList双向链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一 个个小内存块对象挂在span的自由链表中。目的是为了方便thread cache的获取
- Central Cache是线程共享的:在thread cache层我们使用了线程局部存储,使得每个线程都拥有各自的thread cache,因此可以无锁访问,但central cache是线程共享的,因此在多线程的环境下需要加锁访问,而central cache使用的锁是 ”桶锁“ 也就是只有在多个线程访问同一个桶时才会加锁,而访问不同的桶时是不会存在锁竞争的。
页号规定
每个进程都有自己的进程地址空间,不同平台下进程地址空间的大小不同:32位下是2^32 、 64位下是2^64
页的大小一般为4K或者8K,此项目我们规定页的大小为8k,即8*1024=2^13、 那么在32位操作系统下进程地址空间就可以被分为2^19个页, 同理64位下可被分为2^51个页
32位下,_WIN32有定义, _WIN64无定义
64位下,_WIN32和 _WIN64都有定义
static const size_t PAGE_SHIFT = 13;//页号的位操作数//不同平台下定义不同的类型来存储页号
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif
span结构
central cache每个桶下面挂的双向链表连接的是一个个span,span被设计出来用来管理以页位单位的大块内存:
//管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0;//大块内存起始页的页号size_t _n = 0;//页的数量Span* _next = nullptr;//双向链表的结构Span* _prev = nullptr;size_t _objSize = 0; //切好的小对象的大小,用于释放时不需要传入大小size_t _useCount = 0;//切好小块内存,被分配给thread_cache的计数void* _freeList = nullptr;//切好的小块内存的自由链表bool _isUse = false;
};
- _pageId:大块内存的起始页的页号用于转化成大块内存的起始地址,也就是从堆中直接申请出来的空间中的某块地址,也为了方便后续与page cache前后页的合并交互,方便管理
- _n:每个span管理的页数不一样,由于向堆申请时是直接申请一大块内存,因此起始页的页号加页数相当于从大块内存上切下来一块进行管理
- _freeList:对于不同桶下面挂的span,需要切分好相应大小的内存块挂在span下的自由链表中,这样thread cache向central cache索要内存时,直接从链表中取即可
- _useCount:每当有thread cache从span中获取切好的小块内存时,需要进行计数,目的是为了将空闲的span还给page cache,当计数为0时就代表没有线程在使用这个span,即内存块已经全部使用完毕并归还,此时就还给page cache
- 双向链表的结构方便span在归还内存时移出所在之处。
SpanList结构
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}Span* Begin(){return _head->_next;}Span* End(){return _head;}bool Empty(){return _head->_next == _head;}Span* PopFront(){Span* front = _head->_next;Erase(front);return front;}void PushFront(Span* span){Insert(Begin(), span);}void Insert(Span* pos,Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;//不需要delete掉}
private:Span* _head;
public:std::mutex _mtx;//桶锁
};
封装central cache类
前面已经知晓了central cache和thread cache一样都是哈希桶的结构,并且对齐规则也一致,因此桶的数量也一致,区别在于桶下挂的是双向链表。
设置成单例
由于central cache是所有线程共享的,因此我们设置成单例下的饿汉模式,保证系统中该类的实例只有一个
class CentralCache
{
public:static CentralCache* GetInstance(){//在main函数之前就已经被创建出来了,也就是在线程前就创建好了//所以不存在线程安全问题return &_sInst;}
private:SpanList _spanLists[NFREELISTS];
private://构造和拷贝构造私有CentralCache(){}CentralCache(const CentralCache&){}static CentralCache _sInst;
};
CentralCache CentralCache::_sInst;//类外初始化
申请内存逻辑:
- 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对 象的数量使用了类似网络tcp协议拥塞控制的慢开始调节反馈算法;
- central cache也有一个哈希映射的 spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不 过这里使用的是一个桶锁,尽可能提高效率。
- central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的 span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span 中取对象给thread cache。
- central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count
慢开始调节反馈算法
我们把问题回到thread cache 的api:FetchFromCentralCache:当有线程向thread cache申请内存时,若对于的自由链表为空,则会向下一层central cache中获取内存,那一次应该索要多少个此内存大小的数量合适呢?要多了怕浪费,要少了怕频繁来索要,由于桶锁的存在,频繁索要定会大大降低效率,因此我们需要用到慢开始调节反馈算法:
- 慢开始调节反馈算法:小对象给多一点,大对象给少一点,对桶锁效率的调节。但并不会一次向central cache要太多,每次都慢慢增长(可以理解成随着使用频率的增加而申请更多),即不断有申请同一个size大小的内存需求,就不断增长直到上限,上限是给定的,分配的是慢慢增长的,直到超过上限,最多就只能给上限。那么上限是多少呢?我们给出一下函数用来计算上限:
//一次threadCache 从中心缓存获取多少个
static size_t NumMoveSize(size_t size)
{if (size == 0){return 0;}assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高// 大对象一次批量上限低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;
}
通过这个函数,我们可以计算出一次性给可以给thread cache分配的内存块的最大数量,比如我们申请索要8字节的小内存块,那么一次性最多可以分配给thread cache512个8字节的小内存块,而一次性索要200KB的大内存块,我们至少分配2个200KB的大内存块
那么如何来缓慢增长呢?-----我们在FreeList类中多封装一个成员变量_maxSize并初始化为1
class FreeList
{
public:size_t& MaxSize(){return _maxSize;}
private:size_t _maxSize = 1;//慢调节算法,一次最多向CenctralCache申请的最大内存块数量,会随着频率的使用而增加
};
当有线程申请6字节的内存时,通过对齐规则对齐到8字节,并映射到0号桶上(假设桶为空),桶为空时向central cache索要内存,central cache 通过比较此桶中_maxSize(假设此时为1) 与 计算出来的最大上限值–512,取出其中的较小值 maxSize作为此次能够分配给thread cache的内存块数量,并对此值+1,这样一来,随着访问此桶的频率增加, _maxSize的值逐渐增加,直到超过上限值512,之后不管申请几次,central cache都最多分配给thread cache 512个相应大小的内存块。
注意:当申请到多个数量的对象时,需要将第一个对象返回给thread cache的本次使用,剩余的需要挂到相应的桶下
FreeList 方便操作链表接口: PushRange 和 PopRange
class FreeList
{
public:void PushRange(void* start, void* end,size_t n){assert(start);assert(end);NextObj(end) = _freeList;_freeList = start;_size += n;}void PopRange(void*& start, void*& end, size_t n){assert(n <= _size);start = _freeList;end = start;for (size_t i = 0;i < n - 1;++i){if (end == nullptr){int x = 0;}end = NextObj(end);}_freeList = NextObj(end);NextObj(end) = nullptr;_size -= n;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}
private:void* _freeList=nullptr;size_t _maxSize = 1;//慢调节算法,一次最多向CenctralCache申请的最大内存块数量,会随着频率的使用而增加
};
Thread cache 和 central cache 的交互接口:FetchFromCentralCache:
- 使用慢开始反馈调节算法计算出上限
- 根据上限向central cache 索要内存块,使用输出型参数获取内存块的起始和终止地址
- 根据实际获取的内存块数量返回一个给当前使用后,将剩余的插入链表中
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{//慢开始反馈调节算法size_t batchNum = min(_freeLists[index].MaxSize(),SizeClass::NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;//central cache提供api获取size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum >= 1 );if (actualNum == 1){assert(start == end);return start;}else{//用头一个,把剩下的插入表中//提供api插入一段范围,减少循环所消耗的时间复杂度_freeLists[index].PushRange(NextObj(start), end,actualNum-1);return start;}
}
Central cache任务接口: FetchRangeObj:
- 根据size获取下标并从相应的桶中获取一个span
- 根据传入的形参batchNum从获取到的span中管理的自由链表切出内存,并填好输出型参数
- 在迭代的过程中记录实际获取的数量并返回
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();//加桶锁访问//获取一个非空spanSpan* span = GetOneSpan(_spanLists[index], size);//此span和span下的自由链表都不能为空assert(span);assert(span->_freeList);//开始切分start = span->_freeList;end = start;//有可能batchNum > _freeList中的num//有多少给多少size_t i = 0;size_t actualNum = 1;while (i < batchNum - 1 && NextObj(end)!=nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);//剩下的放回去span管理的自由链表NextObj(end) = nullptr;//取出的链表尾置空span->_useCount += actualNum;//更新计数_spanLists[index]._mtx.unlock();return actualNum;
}
central cache第二个任务接口:GetOneSpan
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//查看当前的spanlist是否还有未分配对象的spanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}elseit = it->_next;}//到这里说明没找到,需要往下一层pageCache找
}
PageCache
整体设计
- page cache 与 central cache 的联系
当thread cache向central cache索要内存块时,如果相应的桶下挂有管理好的span时,切分span下的自由链表并返回即可,若相应的桶下没有挂有span,则需要再向下一层page cache索要span块。
- page cache 与 central cache的异同
-
结构一致:与central cache一样,page cache也是哈希桶结构,并且桶下挂的也是以双链表结构链接的span
-
映射规则不一致:thread cache 和 central cache采用的都是同一套对齐与映射规则,而page cache采用的与他们都不一致,采用了直接定址法,并且每一个桶下对应的下标就代表此桶下管理的span所管理的页数,即n号桶下维护的span均管理着n页的内存块
-
服务对象不一致:central cache下的span均被切分好并挂在span维护的自由链表下,而page cache下的span没有被切分,他们两个的服务对象也不相同,central cache切好span挂在自由链表下,提供服务给thread cache,page cache不切分span,而是在central cache的span不够用时直接提供给central cache。
-
桶的数量不一致:最大页数的span决定了page cache下桶的数量,在这里我们规定span的最大页数为128页,因此桶的数量需要设置在129个(除去0号桶)----128页=128 * 8 * 1024 字节 = 4 * 256 * 1024 字节= 4 * 256KB=4 * MAX_BYTES,也就是一个128页的span可以被切分成4块thread cache能够提供的最大字节数,足够满足需求。可根据具体情况提供
static const size_t NPAGES = 129;//最大页数为128页
申请内存逻辑:
- 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有,则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。
- 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式(具体看操作系统) 申请128页page span挂在自由链表中,再重复1中的过程。
- 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质 区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist 中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的 spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
封装page cache类
锁问题
- 在thread cache中不需要加锁,是因为线程局部存储,每个线程都有独立的thread cache
- 在central cache中需要加锁,但加的是 ’‘桶锁‘’ 即thread cache访问不同的桶时不用带锁访问,访问同一个桶时才需要带锁访问
- 在page cache中需要加锁,因为central cache中有可能不同的桶都同时访问page cache,存在线程安全问题,不同central cache的是,page cache加的不是桶锁,而是对整个类进行加锁,因为page cache中各个桶直接需要相互帮助,因此一次访问page cache可能会遍历到所有的桶,若加桶锁的话会出现大量频繁的加锁解锁,降低效率。
设置成单例
page cache也是所有线程共享的,只有一个,因此设计成单例的饿汉模式
class PageCache
{
public:static PageCache* GetInstance(){return &_sInst;}
public:std::mutex _pageMtx;
private:SpanList _spanLists[NPAGES];
private:PageCache() {}PageCache(const PageCache&){}static PageCache _sInst;
};CentralCache CentralCache::_sInst;//类外初始化
增加SpanList接口
central cache和page cache的桶下挂的都是带头双向循环链表,在获取对象时避免不了遍历链表,因此模仿迭代器提供出接口方便遍历:
class SpanList
{
public:Span* Begin(){return _head->_next;}Span* End(){return _head;}
private:Span* _head;
public:std::mutex _mtx;
};
计算申请数量
在thread cache与central cache交互部分我们使用了慢开始反馈调节算法计算了一次性申请的内存块的数量,同样的,我们也需要规定,central cache向page cache索要内存时一次性能够索要多少页的内存块,目的是为了避免频繁的访问下一层。我们指定了一下方法:
- 根据对象大小计算出thread cache 索要字节的最大上限分配数,并乘以单个对象大小,得出最大上限分配字节,再讲最大上限分配字节的单位转化为页,若不足以一页则对齐到一页。
class SizeClass
{public:static size_t NumMovePage(size_t size){//先算出threadCache索要字节的最大上限分配数,用此数来算出centralcache索要的页数size_t num = NumMoveSize(size);size_t npage = num * size;//单位转化:>>相当于除以2的PAGE_SHIFT次方,即页的单位为8knpage >>= PAGE_SHIFT;if (npage == 0){npage = 1;}return npage;}
};
central cache 任务接口的补充:GetOneSpan
- 先解开桶锁,再加pageCache的大锁----thread cache有可能会释放内存,先解开桶锁能够避免此桶的阻塞,等到切分完span之后再重新加上桶锁
- 获取到span后,开始切分span管理的内存并挂在自由链表下,以便提供服务给thread cache
- 地址与页号之间的转化相当于单位之间的转化,地址转字节就是除以8*1024(>>13),字节转化地址就是乘以8 * 1024(<<13)
- 尾插保证了地址的连续性,提高线程的cpu缓存命中率
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//查看当前的spanlist是否还有未分配对象的spanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}elseit = it->_next;}//到这里说明没找到,往下一层pageCache找//计算大块内存的起始地址和大块内存的大小// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞list._mtx.unlock();// 同时对pageCache加大锁PageCache::GetInstance()->_pageMtx.lock();Span* span=PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));PageCache::GetInstance()->_pageMtx.unlock();//将页号转化为内存地址,获取span的起始地址char* start = (char*)(span->_pageId << PAGE_SHIFT);//将页数转化为字节size_t bytes = span->_n << PAGE_SHIFT;//获取span的终止地址char* end = start + bytes;//开始切分大块内存//把大块内存切成自由链表连接起来//先切一块下来做头,方便尾插,尾插保持地址的连续span->_freeList = start;start += size;void* tail = span->_freeList;while (start < end){NextObj(tail) = start;tail = NextObj(tail);start += size;}NextObj(tail) = nullptr;//由于切的时候此时获取到的span其他线程是无法访问的,因此暂时先不加锁,等到要插入时,访问桶时在加锁// 切好span以后,需要把span挂到桶里面去的时候,再加锁list._mtx.lock();list.PushFront(span);return span;
}
SpanList的头插接口:PushFront
class SpanList
{
public:void PushFront(Span* span){Insert(Begin(), span);}void Insert(Span* pos,Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}private:Span* _head;
public:std::mutex _mtx;
};
SpanLists的头删结构:PopFront
当central cache向Page cache索要span时,首先检测的是对应的桶下有无span,有的话就直接在该桶下头删一个span并返回给central cache,因此需要对应的接口PopFront:
class SpanList
{
public:bool Empty(){return _head->_next == _head;}Span* PopFront(){Span* front = _head->_next;Erase(front);return front;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;//不需要delete掉}
private:Span* _head;//static ObjectPool<Span> _spanPool;
public:std::mutex _mtx;
};
central cache 与 page cache的交互接口:NewSpan
-
现在page cache的相应桶下查看是否有span,有则将span返回,没有则继续往后寻找
-
当找大更大的桶时,只需要此大桶下挂的大号span切分成两个小号span即可
-
若后面的桶中页找不到span,则需要直接向堆申请128页的内存并挂在page cache下管理128页内存的span的桶中,再次调用newSpan函数即可。(用递归调用的方法避免了代码的冗余)
Span* PageCache::NewSpan(size_t k) {assert(k);assert(k < NPAGES);//先检查第k页下的桶有没有span管理的内存块if (!_spanLists[k].Empty()){//如果非空,弹出去前需要先建立映射,方便回收Span* kSpan = _spanLists[k].PopFront();for (PAGE_ID i = 0;i < kSpan->_n;++i){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}//若没有,向后找到更大页的内存进行切分for (size_t i = k+1;i < NPAGES;++i){if (!_spanLists[i].Empty()){//将大块内存切成两个小块Span* nSpan = _spanLists[i].PopFront();Span* kSpan = _spanPool.New();kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;//此span只能管理k页的内存块->切分完成nSpan->_pageId += k;nSpan->_n -= k;//将此剩下的挂到相应页数的桶下_spanLists[nSpan->_n].PushFront(nSpan);//存储nspan的首尾页号和nspan的映射,方便回收pagecache的合并_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//建立页id与地址的映射(即页号应该放在哪个span中)方便回收centralcache时找到对应的spanfor (PAGE_ID i = 0;i < kSpan->_n;++i){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//走到这说明后面也无大页span,找系统申请128页的spanSpan* bigSpan = _spanPool.New();void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k); }
ThreadCache回收内存
-
当释放内存小于256k时将内存释放回thread cache,thread cache通过待释放对象size计算映射自由链表桶位置i,并将对象Push 到_freeLists[i]。
-
当不断有同一个桶位置的内存被释放,链表的长度会越来越长,则回收一部分内存对象到central cache。
-
我们规定,当自由链表的长度超过它一次性向central cache申请对象的个数时(_maxSize),就还给central cache:ListTooLong
Thread Cache任务接口:Deallocate
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);//找出桶的位置,头插即可size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);if (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);}
}
ThreadCache任务接口:ListTooLong
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;list.PopRange(start, end, list.MaxSize());CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
FreeList新增接口:PopRange,新增成员变量:_size :方便获取当前链表的长度
使用输出型参数获取链表的一段范围的内存块
class FreeList
{
public:void Push(void* obj){assert(obj);//头插NextObj(obj) = _freeList;_freeList = obj;_size++;}void* Pop(){assert(_freeList);//头删void* obj = _freeList;_freeList = NextObj(obj);_size--;return obj;}void PushRange(void* start, void* end,size_t n){assert(start);assert(end);NextObj(end) = _freeList;_freeList = start;_size += n;}void PopRange(void*& start, void*& end, size_t n){assert(n <= _size);start = _freeList;end = start;for (size_t i = 0;i < n - 1;++i){if (end == nullptr){int x = 0;}end = NextObj(end);}_freeList = NextObj(end);NextObj(end) = nullptr;_size -= n;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}size_t Size(){return _size;}
private:void* _freeList=nullptr;size_t _maxSize = 1;//慢调节算法,一次最多向CenctralCache申请的最大内存块数量,会随着频率的使用而增加size_t _size = 0;//当前链表堆积的内存块数
};
CentralCache回收内存
- 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–use_count。
- 当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache
- page cache中会对前后相邻的空闲页进行合并。
地址与span的映射关系
需要注意的是,thread cache在归还链表时,由于central cache中span的数量不止一个,因此我们需要知道此链表是属于哪一块span的,必须归还到指定的span才能保证内存的连续,以便于page cache前后页合并
由于页号与地址的转化是通过除法获得,因此我们可以利用除法,我们假设一页为100字节,地址1-99间的所有地址转化为页号都是第0页,而地址100-199间的所有地址转化为页号也都是第1页,同理,一页为8kb字节也是如此。
因此我们只需管理好页号与span的关系即可,即哪一个页号是属于哪一个span的,一旦thread cache有自由链表需要归还时,我们只需要将地址转化为页号,并查看此页号的相关span即可
我们使用哈希表unordered_map来实现映射关系,由于此映射关系在page cache的合并时也需要用到,因此我们新增page cache的成员变量,并提供接口来检查传入的地址是否有映射span,有则返回,没有则返回空
page cache新增成员变量:
class PageCache
{
public:Span* MapObjectToSpan(void* obj);
private:std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};
page cache接口NewSpan的修改
由于需要建立好映射关系,因此在central cache索要k页span时,需要提前建立好这k页内存的页号与span的关系
修改:
- 若在第k号桶下找到了k页的span,则需要先建立此span所管理的所有页与span的映射
- 若k号桶为空,但向后找到了更大的n号桶进行切分,并分别挂到k号桶以及n-k号桶下,此时我们不仅需要建立即将返回的k号桶的span管理的页号与span 的映射关系,还需要建立第n-k号桶的首页与尾页与span的映射关系,目的是为了让centra cahe归还span给page cache时,page cache能够进行前后页的合并,减少内存碎片的产生
Span* PageCache::NewSpan(size_t k)
{assert(k);assert(k < NPAGES);//先检查第k页下的桶有没有span管理的内存块if (!_spanLists[k].Empty()){//如果非空,弹出去前需要先建立映射,方便回收Span* kSpan = _spanLists[k].PopFront();for (PAGE_ID i = 0;i < kSpan->_n;++i){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}//若没有,向后找到更大页的内存进行切分for (size_t i = k+1;i < NPAGES;++i){if (!_spanLists[i].Empty()){//将大块内存切成两个小块Span* nSpan = _spanLists[i].PopFront();Span* kSpan = _spanPool.New();kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;//此span只能管理k页的内存块->切分完成nSpan->_pageId += k;nSpan->_n -= k;//将此剩下的挂到相应页数的桶下_spanLists[nSpan->_n].PushFront(nSpan);//存储nspan的首尾页号和nspan的映射,方便回收pagecache的合并_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//建立页id与地址的映射(即页号应该放在哪个span中)方便回收centralcache时找到对应的spanfor (PAGE_ID i = 0;i < kSpan->_n;++i){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//走到这说明后面也无大页span,找系统申请128页的spanSpan* bigSpan = _spanPool.New();void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);
}
获取地址的映射span接口:MapObjectToSpan
注意:C++的STL不保证线程安全,因此需要对哈希表的查找加锁,防止线程不安全导致的检查错误
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;//stl不保证线程安全,Free在查找时有可能pageCache内部正在修改// 使用基数桶后不再需要加锁std::unique_lock<std::mutex> lock(_pageMtx);auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}if (ret != nullptr){return ret;}else{//一定是有建立映射才进来查找assert(false);return nullptr;}return ret;
}
thread cache 与 central cache交互接口:ReleaseListToSpans
- 遍历自由链表,获取其映射的span,插入到span管理的自由链表下,更新计数_useCount
- 当计数_useCount为0时,说明span分配出去的链表已经全部归还,此时将span进一步归还给page cache
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();//遍历list,并放回对应的spanwhile (start){void* next = NextObj(start);//通过先前切分时建立的地址与span块的映射关系找到下一层中对应的spanSpan* span = PageCache::GetInstance()->MapObjectToSpan(start);NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;if (span->_useCount == 0){//已全部归还,这个span归还给pagecache//先弹出span,并将自由链表置空_spanLists[index].Erase(span);span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;//先解掉桶锁,再加page的大锁_spanLists[index]._mtx.unlock();PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();//把桶锁加回去_spanLists[index]._mtx.lock();}start = next;}_spanLists[index]._mtx.unlock();
}
PageCache回收内存
- 如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span, 看是否可以合并,如果合并继续向前寻找。
- 这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
前后页探索过程
假设此时central cache归还一个页号为id,页数为n页的span:
先向前合并:
首先我们需要检测页号为 id-1 的页号是否存在有span的映射关系,若没有,则不需要再向前合并,若有,则有可能有以下三种情况:
-
由于我们是通过映射关系获取的span,而不是遍历page cache桶获取的span,因此此时我们并不知道此span是否被central cache借走,因此我们需要增加Span的成员变量:_isUse并初始化为false(没有被使用状态),一旦span被借走使用则将状态修改为true
//管理多个连续页大块内存跨度结构 struct Span {bool _isUse = false; };//修改central cache中任务函数接口:GetOneSpan:PageCache::GetInstance()->_pageMtx.lock();Span* span=PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));span->_objSize = size;span->_isUse = true;PageCache::GetInstance()->_pageMtx.unlock();
-
第一种情况:_isUse为true,说明此时这个span已经被借走了,并不存在page cache的桶下,因此不再需要合并
-
第二种情况:_isUse为false,说明此时这个span并没有被借走,可能是被归还的没有合并成功的span,也有可能是被切分的大块span中留下来的那块span,这就是在NewSpan函数中建立留下来的那块span的首页与尾页的映射关系的作用,此时就要进行合并
-
第三种情况:合并后的页数已经超过最大页数----128页,则我们选择不合并
再向后合并:
同理,首先我们需要检测页号为 id+n 的页号是否存在有span的映射关系,若没有,则不需要向后合并,若有,则有以下三种情况:
- 第一种情况:_isUse为true,说明此时这个span已经被借走了,并不存在page cache的桶下,因此不再需要合并
- 第二种情况:_isUse为false,说明并没有被借走,此时进行合并
- 第三种情况:合并后的页数已经超过最大页数----128页,则我们选择不合并
前后页均合并完成后,再将大块span重新插入桶中,小块span移除,再重新建立大块span首页与尾页的映射关系即可:
void PageCache::ReleaseSpanToPageCache(Span* span)
{//对span前后的页尝试进行合并,缓解内存碎片问题//向前合并while (1){PAGE_ID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);//前面的页号不存在内存块,不需要合并if (ret == _idSpanMap.end()){break;}//前面的页号存在内存块,但被借走了Span* prevSpan = ret->second;if(prevSpan->_isUse==true){break;}//合并出超过128页的span不合并if (prevSpan->_n + span->_n > NPAGES - 1){break;}span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;_spanLists[prevSpan->_n].Erase(prevSpan);delete prevspan;}//向后合并while (1){PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);//前面的页号不存在内存块,不需要合并if (ret == _idSpanMap.end()){break;}Span* nextSpan = ret->second;if (nextSpan->_isUse == true){break;}if (span->_n + nextSpan->_n > NPAGES - 1){break;}span->_n += nextSpan->_n;//将nextSpan从对应的双链表中移除_spanLists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}//向前向后都合并完毕,调整合并后的span_spanLists[span->_n].PushFront(span);_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n-1] = span;//状态置为falsespan->_isUse = false;
}
细节优化
当内存大于256KB时的申请问题
目的的代码,thread cache只支持申请小于等于256KB(32页)内存大小,而page cache只支持128页以下内存大小的申请,因此我们需要对超过256KB内存大小的申请做一下处理。我们规定:
申请内存大小 | 申请内存方法 |
---|---|
[1,256KB]----32页之内 | 向thread cache 申请 |
[256KB,1024KB]----32页-128页之间 | 向page cache申请 |
超过128页 | 直接向堆申请 |
不管是多大内存的申请,我们都需要先根据对齐规则对齐,对于32页以上内存的申请,我们按页对齐,这一点在对齐函数RoundUp中已经进行了处理:
用户使用接口:ConcurrentAlloc
用户在使用时,并不是直接调用thread cache的alloc去申请内存的,我们参考空间配置器的设计,将高并发内存池封装成一个接口给用户使用,用户只需要传入需要申请内存的大小即可,而接口ConcurrentAlloc需要对大小进行判断,若大于32页的内存,则直接去page cache 申请(直接调用NewSpan获取对应页数的span即可),小于32页的内存才去thread cache申请
static void* ConcurrentAlloc(size_t size)
{if (size > MAX_BYTES){//大于256KB直接去找pageCache//计算出按页对齐后需要申请的页数size_t alignSize = SizeClass::RoundUp(size);size_t kPage = alignSize >> PAGE_SHIFT;//向pageCache申请kPage的spanPageCache::GetInstance()->_pageMtx.lock();Span* span=PageCache::GetInstance()->NewSpan(kPage);span->_objSize = size;span->_isUse = true;PageCache::GetInstance()->_pageMtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}//256KB以内的去thread cache申请else{if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}return pTLSThreadCache->Allocate(size);}
}
修改page cache任务接口:NewSpan
对于页数大于128页的大块内存申请,只需在NewSpan函数加上对应判断页数,若超过128页再从page cache去堆直接申请并返回,若小于128页的才继续走下面的逻辑。
Span* PageCache::NewSpan(size_t k)
{assert(k);
//assert(k < NPAGES);
//如果申请大于128页,直接找堆申请即可
if (k > NPAGES - 1)
{void* ptr = SystemAlloc(k);Span* span = _spanPool.New();span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;//建立映射_idSpanMap[span->_pageId] = span;return span;
}//先检查第k页下的桶有没有span管理的内存块if (!_spanLists[k].Empty()){//如果非空,弹出去前需要先建立映射,方便回收Span* kSpan = _spanLists[k].PopFront();for (PAGE_ID i = 0;i < kSpan->_n;++i){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}//若没有,向后找到更大页的内存进行切分for (size_t i = k+1;i < NPAGES;++i){if (!_spanLists[i].Empty()){//将大块内存切成两个小块Span* nSpan = _spanLists[i].PopFront();Span* kSpan = _spanPool.New();kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;//此span只能管理k页的内存块->切分完成nSpan->_pageId += k;nSpan->_n -= k;//将此剩下的挂到相应页数的桶下_spanLists[nSpan->_n].PushFront(nSpan);//存储nspan的首尾页号和nspan的映射,方便回收pagecache的合并_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//建立页id与地址的映射(即页号应该放在哪个span中)方便回收centralcache时找到对应的spanfor (PAGE_ID i = 0;i < kSpan->_n;++i){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//走到这说明后面也无大页span,找系统申请128页的spanSpan* bigSpan = _spanPool.New();void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);
}
释放内存
目前我们只完成了用户申请内存的逻辑,当用户使用完毕后,需要将内存进行释放回收,我们同样的提供接口ConcurrentFree给用户使用,便于回收。
同时,由于我们在申请时,根据申请内存大小的不同,申请的方式也不同,因此释放时,也需要根据对应的申请方式做到对应的释放:
释放内存大小 | 释放方式 |
---|---|
[1,256KB]----32页之内 | 释放给thread cache |
[256KB,1024KB]----32页-128页之间 | 释放给page cache |
超过128页 | 直接释放给堆 |
注意
我们以malloc和free函数做参考,malloc在申请内存时,只需要传入需要申请内存的大小即可,这一点我们在ConcurrentAlloc已经得以实现并走通逻辑,而free函数,只需要传入需要释放内存的头指针即可自动释放内存。但我们目前的代码中我们知道,我们每次申请内存时,都是从一大块连续空间的大内存切分成小内存出来使用的,因此我们释放内存时,如果不知道准确无误的对应内存块的大小,很有可能会将别的内存块释放掉,进而造成程序错误;同时,我们还需要根据大小来做出释放方式的选择,所以为了知道将释放内存块的大小,最容易想到的方法就是在用户调用ConcurrentFree时,除了传入指针以外,还需要传入大小,举个例子:
void* p1 = ConcurrentAlloc(2000);
ConcurrentFree(p1,2000);
这样我们便能在ConcurrentFree的逻辑里面提取出对齐后的大小,从而进行准确无误的释放内存块,但这种方法过于简陋,有以下的缺点:
- 手动输入的错误率大,有可能在输入大小时失误操作导致错误,从而释放错内存块
- 与free函数的使用方法不一致
那么我们如何做到优化呢?
ConcurrentFree优化----自动通过传入指针获取大小,不需要用户输入
由于我们在每一次返回span时,都会建立页号与span间的映射,而页号其实就是地址转化的,因此我们可以通过传入的指针来获取对应的页号,再通过页号获取对应的span块,所以我们可以将每一次申请的size交给span来管理,这样就可以通过地址自动获取大小了:
Span结构体的修改:增加对象大小:_objSize
//管理多个连续页大块内存跨度结构
struct Span
{size_t _objSize = 0; //切好的小对象的大小,用于释放时不需要传入大小
};
当每一次调用到NewSpan获取到span时,就将此成员变量填入保存:
//两处修改:
//1.GetOneSpan
//2.ConcurrentAlloc
Span* span=PageCache::GetInstance()->NewSpan(kPage);
span->_objSize = size;
这样一来,我们就可以来走通ConcurrentFree的逻辑了:
用户使用接口:ConcurrentFree
-
先获取指针指向内存块所在的span,并从span中提取出对象大小size
-
再判断大小,根据大小做出对应的释放方式
static void ConcurrentFree(void* ptr)//待解决,如何用大小 {//大于256KB时,是向PageCache申请的,那么释放时自然直接还给PageCache即可Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objSize;if (size > MAX_BYTES){PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);} }
page cache任务接口修改:ReleaseSpanToPageCache
新增细节:先判断所需释放的内存页数是否大于128页,是的话直接调用封装的SystemFree接口直接释放给堆,并将span释放掉
void PageCache::ReleaseSpanToPageCache(Span* span) {if (span->_n > NPAGES - 1){//如果大于128页,那么是向堆直接申请的,就直接释放给堆void* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);delete span;return;}//对span前后的页尝试进行合并,缓解内存碎片问题//向前合并while (1){PAGE_ID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);//前面的页号不存在内存块,不需要合并if (ret == _idSpanMap.end()){break;}//前面的页号存在内存块,但被借走了Span* prevSpan = ret->second;if(prevSpan->_isUse==true){break;}//合并出超过128页的span不合并if (prevSpan->_n + span->_n > NPAGES - 1){break;}span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;_spanLists[prevSpan->_n].Erase(prevSpan);delete prevspan;}//向后合并while (1){PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);//前面的页号不存在内存块,不需要合并if (ret == _idSpanMap.end()){break;}Span* nextSpan = ret->second;if (nextSpan->_isUse == true){break;}if (span->_n + nextSpan->_n > NPAGES - 1){break;}span->_n += nextSpan->_n;//将nextSpan从对应的双链表中移除_spanLists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}//向前向后都合并完毕,调整合并后的span_spanLists[span->_n].PushFront(span);_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n-1] = span;//状态置为falsespan->_isUse = false; }
封装向堆释放内存接口:SystemFree
向堆直接释放空间有以下方法:
Windows下:VirtualFree
Linux下: sbrk和unmmap
inline static void* SystemFree(void* ptr) { #ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE); #else // linux下sbrk unmmap等 #endifif (ptr == nullptr)throw std::bad_alloc();return ptr; }
定长内存池组件----脱离malloc
span脱离malloc
设计高并发内存池的初衷是为了学习tcmalloc的思想,使其能够在高并发的场景下替代malloc进行高效率的内存申请,因此当然在使用时是要脱离malloc函数的,但当前我们代码在实现时,有部分使用了new函数来开辟空间(span等),而new的底层其实就是封装了malloc函数,为了完全脱离malloc函数,我们使用我们之前所写的定长内存池组件----ObjectPool来替代代码中new出来的空间
由于Span空间的开辟都来源于page cache,因此我们直接在page cache类中添加新的成员变量:ObjectPool来为span空间的开辟服务:
class PageCache
{
private:ObjectPool<Span> _spanPool;
};
接下来只需要将代码中所有new span 以及 delete span的部分替换成_spanPool.New(), _spanPool.Delete()即可
//Span* span=new span;
Span* span = _spanPool.New();//delete span
_spanPool.Delete(span);
thread cache脱离malloc
我们在thread cache中使用了线程局部存储,而每个线程的全局thread cache也是new出来的,因此也需要替换掉:
if (pTLSThreadCache == nullptr)
{//pTLSThreadCache = new ThreadCache;static ObjectPool<ThreadCache> tcPool;static std::mutex tcMtx;tcMtx.lock();pTLSThreadCache = tcPool.New();tcMtx.unlock();
}
return pTLSThreadCache->Allocate(size);
高并发环境对比malloc测试
我们使用以下测试函数来测试高并发环境下此内存池与malloc的性能对比:
//单轮申请和释放次数、线程数、轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc(16 + i % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl;cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次free" << ntimes << "次: 花费:" << free_costtime << "ms" << endl;cout << nworks << "个线程并发执行" << rounds << "malloc&free" << nworks * rounds * ntimes << "次: 总计花费:" << malloc_costtime + free_costtime << "ms" << endl;
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(ConcurrentAlloc(16 + i % 8192 + 1));//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次concurrent alloc" << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl;cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次concurrent dealloc" << ntimes << "次: 花费:" << free_costtime << "ms" << endl;cout << nworks << "个线程并发执行" << rounds << "concurrent alloc&dealloc" << nworks * rounds * ntimes << "次: 总计花费:" << malloc_costtime + free_costtime << "ms" << endl;}
int main()
{size_t n = 10000;cout << "==========================================================" <<endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" <<endl;return 0;
}
测试内容:多线程环境下申请和释放不同内存大小的空间,并比较时间,我们设置4个线程,一共进行10轮测试,每一次申请和释放各10000次
运行之后我们发现:
对比之后我们发现,concurrent alloc相较于malloc申请的效率还是挺可观的,但是申请有多可观,释放就有多不可观,所以我们需要找出释放内存时性能遇到瓶颈的原因,再针对原因进行优化
性能瓶颈分析
使用vs编译器下性能分析工具
我们可以在调试窗口中,找到 “性能探查器” ----注意要在Debug模式下进行
选中检测后点击下方的开始即可
选中对应文件点击确定即可等待分析结果;分析时,推荐将申请和释放次数减少,并只需要测试ConcurrentAlloc和ConcurrentFree即可,减少时间浪费
性能分析结果
我们发现最耗时的函数是ConcurrentFree
点击后发现基本上全是MapObjectToSpan占用的时间
我们再来看看Deallocate中哪部分最耗时间:
Deallocate函数中:ListTooLong函数的时间占比最大,再往下看:
ListTooLong函数中,时间占比最大的是调用函数ReleaseListToSpans,接着往下看:
原来最终最消耗时间的还是MapObjectToSpan:
而MapObjectToSpan中占时间最多的就是防止STL线程安全问题的加锁与解锁
因此根据性能优化的结果,我们需要对性能瓶颈----页号与span映射容器的加锁问题进行优化,使得我们在读取映射容器时,能够避免加锁
根据性能瓶颈优化----基数树
基数树的底层就是分层的哈希表,可分为单层基数树,二层基数树以及三层基数树
单层基数树
单层基数树采用的是直接定址法,由于我们规定了一页为8k,那么在32位平台下, 页的数目就是2^32 / 2^13 = 2^19 ,同理,64位平台下,页的数目就是2^51,而单层基数树其实就是开辟与页数相同数量的指针数组,每一个页号对应的span的地址就存储在此数组中:
// 单层基数树
template <int BITS>
class TCMalloc_PageMap1 {
private:static const int LENGTH = 1 << BITS;void** array_;
public:typedef uintptr_t Number;explicit TCMalloc_PageMap1() {size_t size = sizeof(void*) << BITS;size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}void* get(Number k) const {if ((k >> BITS) > 0) {return NULL;}return array_[k];}void set(Number k, void* v) {assert((k >> BITS) == 0);array_[k] = v;}
};
- set函数建立映射,get函数读取映射
- 很明显64位平台下不能使用单层基数树,那样需要开辟2^51 * 8 =254=224G,显然不可以
- 32位平台下开辟的空间大小为:2^19 * 4 = 2^21 = 2M
二层基数树
二层基数树底层也是数组,不过建立了两次映射:以32位平台为例,页号最多可以使用19个比特位,先拿出前5个比特位在第一层映射,再用剩下的比特位映射到第二层,最终将span指针存储在映射两层后的空间里:
底层:容量为32的指针数组,存放的指针指向容量为2^14的指针数组,因此,二层基数树若全部开辟也是需要32 * 2^14 * 4 = 2M
所以二层基数树若全部空间开辟也是和单层基数树消耗一样的空间,巧妙的是,二层基数树可以设计成不需要提取开辟第二层的空间,只需要提前开辟好第一层的空间并在有映射到来时再开辟第二层对应的空间即可,但由于2M的空间不算太大,因此我们直接提取全部开辟:
//双层基数树
template <int BITS>
class TCMalloc_PageMap2 {
private:static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodespublic:typedef uintptr_t Number;explicit TCMalloc_PageMap2() {memset(root_, 0, sizeof(root_));//第一层进行初始化PreallocateMoreMemory();//直接将第二层全部开辟}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;//第一层下标const Number i2 = k & (LEAF_LENGTH - 1);//第二层下标assert(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// 页号超出范围if (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {//直接全部开辟Ensure(0, 1 << BITS);}
};
注意:单层与二层适用于32位平台不适用于64位平台
三层基数树
同样的,三层基数树就是比二层基数树再多一层映射
由于在64位平台下,提取全部开辟空间不可能实现,因此三层基数树就一定要设计成有映射到来时再开辟对应空间,在这之前先开辟第一层即可,这样可以节省空间:
//三层基数树
template <int BITS>
class TCMalloc_PageMap3 {
private:static const int INTERIOR_BITS = (BITS + 2) / 3; // 一二层总数static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;//叶子层总数static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_; // Root of radix tree//void* (*allocator_)(size_t); // Memory allocatorNode* NewNode() {//Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));static ObjectPool<Node> nodePool;Node* result = nodePool.New();if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}
public:typedef uintptr_t Number;//explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap3() {root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {/*leaf* leaf = reinterpret_cast<leaf*>((*allocator_)(sizeof(leaf)));*/static ObjectPool<Leaf> leafPool;Leaf* leaf = leafPool.New();if (leaf == nullptr) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};
使用基数树优化代码
我们在32位平台下举例,我们需要将page cache中原先封装用来映射的哈希表更换成基数树:
并且在每一个需要建立映射的地方,替换成基数树的映射方式:
在每一个需要读取映射的地方,替换成基数树的读取映射方式:
最后修改性能瓶颈的罪魁祸首:MapObjectToSpans,使用了基数树后就不再有STL的线程安全问题,因此取消加锁来提高性能:
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;//使用基数桶后不再需要加锁Span* ret = (Span*)_idSpanMap.get(id);assert(ret != nullptr);return ret;
}
基数树不需要加锁
由于STL中哈希表unordered_map的底层哈希表的空间是没有提取开辟的,并且就算提前开辟了,在散列函数的影响下,其中的数据位置会发生变化,因此多线程下一旦发现插入和读取同时进行,就会发生线程安全问题。
而基数树的空间一旦开辟了是不会发生数据移动的,只要建立了映射都是固定一个位置进行读取映射,而此位置也不会被覆盖,因为每个页号都是唯一的,开辟的空间也是唯一的,不存在对同一个位置进行多次映射的情况。
基数树的性能分析
我们使用了基数树优化了代码后,再一次来进行测试:
dealloc的速度对比之前有了飞跃的提升,因此使得整体的效率也上去了,性能得到了真实的改进
项目源码
ConcurrentPool