文章目录
- Page Cache层的结构
- 申请内存的流程
- 释放内存的流程
- Page Cache对象结构
- Page Cache所需要的成员变量
- Page Cache所需要的成员变量
- GetInstance()的实现
- NewSpan()的实现
- GetOneSpan()的实现
- NewSpan()的实现
- Page Cache层实现的全部代码
- Common.h
- ThreadCache.h
- ThreadCache.cpp
- ConcurrentAlloc.h
- CentralCache.h
- CentralCache.cpp
- PageCache.h
- PageCache.cpp
Page Cache层的结构
Central Cache也是一个哈希桶结构,但是它的映射关系和之前两次有所不同,Page Cache的映射关系和span的页数有关一页span(8K)所以,Page Cache最多有128页的span。
申请内存的流程
- 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。
- 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
- 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
释放内存的流程
如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
这里就先不实现之后再实现。
Page Cache对象结构
Page Cache和Central Cache一样全局只能有一个对象,所以我们要把Page Cache对象设计出单例模式里面的懒汉模式
Page Cache所需要的成员变量
#pragma once#include "Common.h"//单例模式
class PageCache
{
public:std::mutex _pageMtx;
private:SpanList _spanLists[NPAGES];PageCache()//设为私有,防止构造新的对象{}PageCache(const PageCache&) = delete;//防止外面拷贝构造新对象static PageCache _sInst;//设成static全局变量,保证所有线程可见
};
SpanList _spanLists[NPAGES]
:哈希桶结构,每个位置对应相应页数的span。
NPAGES
:为129所以数组有129个位置,但是0位置不挂任何东西,因为没有0页的span
NPAGES设为129是为了方便我们映射,1page就银蛇在下标为1处,我们不用-0处理,当然我们也可以只建128个也没问题。
std::mutex _pageMtx
:用成全局锁,因为如果用桶锁有些情况下桶锁不住得用全局锁才可以。
Page Cache所需要的成员变量
#pragma once#include "Common.h"class PageCache
{
public://获取全局唯一的page cache对象static PageCache* GetInstance(){}// 获取k也spanSpan* NewSpan(size_t k);std::mutex _pageMtx;
private:SpanList _spanLists[NPAGES];PageCache(){}PageCache(const PageCache&) = delete;static PageCache _sInst;
};
GetInstance()
:获取全局唯一的Page Cache对象
NewSpan()
:获取k页的span
GetInstance()的实现
static PageCache* GetInstance(){//把全局唯一的对象传引用给外界return &_sInst;}
//这里用static的原因和之前Central Cache层一样
NewSpan()的实现
在实现 NewSpan()执勤我们要先实现Central Cache的GetOneSpan(),之前因为GetOneSpan()的实现逻辑和Page Cache层相关,所以就没有实现。现在已经知道Page Cache层的结构就可以实现了。
GetOneSpan()的实现
GetOneSpan()因为是外面是直接传入SpanList对象,所以我们只要判断该SpanList上还有没有未分配的SpanList即可。
有,则直接返回一个span对象
没有,则向Page Cache层索要一个Span对象
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// 查看当前的spanlist中是否有还有未分配对象的span//因为要遍历spanlist,所以我们要提供span的头节点,和为节点,方便遍历。//补充点1:补充Begin()和End()的实现Span* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}else{it = it->_next;}}// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞list._mtx.unlock();// 走到这里说没有空闲span了,只能找page cache要PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));PageCache::GetInstance()->_pageMtx.unlock();// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span// 计算span的大块内存的起始地址和大块内存的大小(字节数)char* start = (char*)(span->_pageId << PAGE_SHIFT);size_t bytes = span->_n << PAGE_SHIFT;char* end = start + bytes;// 把大块内存切成自由链表链接起来// 1、先切一块下来去做头,方便尾插//补充点2:切span的过程span->_freeList = start;start += size;void* tail = span->_freeList;int i = 1;while (start < end){++i;NextObj(tail) = start;tail = NextObj(tail); // tail = start;start += size;}NextObj(tail) = nullptr;// 切好span以后,需要把span挂到桶里面去的时候,再加锁list._mtx.lock();//补充点3:PushFront(span)的实现list.PushFront(span);return span;
}
补充点1:补充Begin()和End()的实现
// 带头双向循环链表
// 展示部分SpanList
class SpanList
{
public:Span* Begin(){return _head->_next;}Span* End(){return _head;}
private:Span* _head;
public:std::mutex _mtx; // 桶锁
};
补充点2:切span的过程
切span的代码
span->_freeList = start;
start += size;//size是一个对象的大小
void* tail = span->_freeList;
int i = 1;
while (start < end)
{++i;NextObj(tail) = start;tail = NextObj(tail); // tail = start;start += size;
}
NextObj(tail) = nullptr;
切span的过程
开始切
start先向后走,走到下一个对象的起始地址
开始链接:end的前几个字节存储start指向的下一个对象的首地址
end向后走,start也向后走
再开始链接
之后就这样一步一步的链接到最后。
这里的end就是上面的tail,改麻烦就没改。
到最后tail要置空,防止越界访问。
补充点3:PushFront()的实现
PushFront()过程图
PushFront()的实现
复用了Insert。
// 带头双向循环链表
// 展示部分SpanList
class SpanList
{
public:Span* Begin(){return _head->_next;}void PushFront(Span* span){Insert(Begin(), span);}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;// prev newspan posprev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}
private:Span* _head;
public:std::mutex _mtx; // 桶锁
};
NewSpan()的实现
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);// 先检查第k个桶里面有没有span//补充点1:PopFront()的实现if (!_spanLists[k].Empty()){//有则返回k页span对象return _spanLists[k]->PopFront();}// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分//补充点2:切分span的逻辑for (size_t i = k+1; i < NPAGES; ++i){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;// 在nSpan的头部切一个k页下来// k页span返回// nSpan再挂到对应映射的位置kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;_spanLists[nSpan->_n].PushFront(nSpan);return kSpan;}}// 走到这个位置就说明后面没有大页的span了// 这时就去找堆要一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);//递归调用NewSpanreturn NewSpan(k);
}
补充点1:PopFront()的实现
PopFront():过程图
PopFront()的实现
复用Erase。
// 带头双向循环链表
class SpanList
{
public:Span* PopFront(){Span* front = _head->_next;Erase(front);return front;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
public:std::mutex _mtx; // 桶锁
};
补充点2:切分span的逻辑
比如Central Cache向Page Cache要一个2页的span,但是2页的span出正好没有。
而后面有一个128页的span就可以切分成2页和126页的span
之后2页的span给Central Cache层,126页的span挂在126桶下
然后一直这样切分直到满足不了Central Cache层的需求Page Cache就向系统再申请一个128页的page。
Page Cache层实现的全部代码
Common.h
#pragma once#include <iostream>
#include <vector>
#include <algorithm>#include <time.h>
#include <assert.h>#include <thread>
#include <mutex>using std::cout;
using std::endl;#ifdef _WIN32#include <windows.h>
#else// ...
#endifstatic const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
static const size_t NPAGES = 129;
static const size_t PAGE_SHIFT = 13;#ifdef _WIN64typedef unsigned long long PAGE_ID;
#elif _WIN32typedef size_t PAGE_ID;
#else// linux
#endif// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}static void*& NextObj(void* obj)
{return *(void**)obj;
}// 管理切分好的小对象的自由链表
class FreeList
{
public:void Push(void* obj){assert(obj);// 头插//*(void**)obj = _freeList;NextObj(obj) = _freeList;_freeList = obj;}void PushRange(void* start, void* end){NextObj(end) = _freeList;_freeList = start;}void* Pop(){assert(_freeList);// 头删void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}private:void* _freeList = nullptr;size_t _maxSize = 1;
};// 计算对象大小的对齐映射规则
class SizeClass
{
public:// 整体控制在最多10%左右的内碎片浪费// [1,128] 8byte对齐 freelist[0,16)// [128+1,1024] 16byte对齐 freelist[16,72)// [1024+1,8*1024] 128byte对齐 freelist[72,128)// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8*1024){return _RoundUp(size, 128);}else if (size <= 64*1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8*1024);}else{assert(false);return -1;}}static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128){return _Index(bytes, 3);}else if (bytes <= 1024){return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024){return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else{assert(false);}return -1;}// 一次thread cache从中心缓存获取多少个static size_t NumMoveSize(size_t size){assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高// 小对象一次批量上限低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;}// 计算一次向系统获取几个页static size_t NumMovePage(size_t size){size_t num = NumMoveSize(size);size_t npage = num*size;npage >>= PAGE_SHIFT;if (npage == 0)npage = 1;return npage;}
};// 管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0; // 大块内存起始页的页号size_t _n = 0; // 页的数量Span* _next = nullptr; // 双向链表的结构Span* _prev = nullptr;size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数void* _freeList = nullptr; // 切好的小块内存的自由链表
};// 带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}Span* Begin(){return _head->_next;}Span* End(){return _head;}bool Empty(){return _head->_next == _head;}void PushFront(Span* span){Insert(Begin(), span);}Span* PopFront(){Span* front = _head->_next;Erase(front);return front;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;// prev newspan posprev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
public:std::mutex _mtx; // 桶锁
};
ThreadCache.h
#pragma once#include "Common.h"class ThreadCache
{
public:// 申请和释放内存对象void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);// 从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELIST];
};// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
ThreadCache.cpp
#include "ThreadCache.h"
#include "CentralCache.h"void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{// 慢开始反馈调节算法// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限// 3、size越大,一次向central cache要的batchNum就越小// 4、size越小,一次向central cache要的batchNum就越大size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum > 0);if (actualNum == 1){assert(start == end);return start;}else{_freeLists[index].PushRange(NextObj(start), end);return start;}
}void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, alignSize);}
}void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);// 找对映射的自由链表桶,对象插入进入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);// ...
}
ConcurrentAlloc.h
#pragma once#include "Common.h"
#include "ThreadCache.h"static void* ConcurrentAlloc(size_t size)
{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << ":"<<pTLSThreadCache<<endl;return pTLSThreadCache->Allocate(size);
}static void ConcurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);
}
CentralCache.h
#pragma once#include "Common.h"// 单例模式
class CentralCache
{
public:static CentralCache* GetInstance(){return &_sInst;}// 获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t byte_size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:SpanList _spanLists[NFREELIST];private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
};
CentralCache.cpp
#include "CentralCache.h"
#include "PageCache.h"CentralCache CentralCache::_sInst;// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// 查看当前的spanlist中是否有还有未分配对象的spanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}else{it = it->_next;}}// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞list._mtx.unlock();// 走到这里说没有空闲span了,只能找page cache要PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));PageCache::GetInstance()->_pageMtx.unlock();// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span// 计算span的大块内存的起始地址和大块内存的大小(字节数)char* start = (char*)(span->_pageId << PAGE_SHIFT);size_t bytes = span->_n << PAGE_SHIFT;char* end = start + bytes;// 把大块内存切成自由链表链接起来// 1、先切一块下来去做头,方便尾插span->_freeList = start;start += size;void* tail = span->_freeList;int i = 1;while (start < end){++i;NextObj(tail) = start;tail = NextObj(tail); // tail = start;start += size;}NextObj(tail) = nullptr;// 切好span以后,需要把span挂到桶里面去的时候,再加锁list._mtx.lock();list.PushFront(span);return span;
}// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);// 从span中获取batchNum个对象// 如果不够batchNum个,有多少拿多少start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;while ( i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;span->_useCount += actualNum;_spanLists[index]._mtx.unlock();return actualNum;
}
PageCache.h
#pragma once#include "Common.h"class PageCache
{
public:static PageCache* GetInstance(){return &_sInst;}// ȡһKҳspanSpan* NewSpan(size_t k);std::mutex _pageMtx;
private:SpanList _spanLists[NPAGES];PageCache(){}PageCache(const PageCache&) = delete;static PageCache _sInst;
};
PageCache.cpp
#include "PageCache.h"PageCache PageCache::_sInst;// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);// 先检查第k个桶里面有没有spanif (!_spanLists[k].Empty()){return _spanLists[k]->PopFront();}// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分for (size_t i = k+1; i < NPAGES; ++i){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;// 在nSpan的头部切一个k页下来// k页span返回// nSpan再挂到对应映射的位置kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;_spanLists[nSpan->_n].PushFront(nSpan);return kSpan;}}// 走到这个位置就说明后面没有大页的span了// 这时就去找堆要一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);
}