课程地址:https://pdos.csail.mit.edu/6.S081/2020/schedule.html
Lab 地址:https://pdos.csail.mit.edu/6.S081/2020/labs/thread.html
我的代码地址:https://github.com/Amroning/MIT6.S081/tree/lock
xv6手册:https://pdos.csail.mit.edu/6.S081/2020/xv6/book-riscv-rev1.pdf
相关翻译:https://xv6.dgs.zone/labs/requirements/lab8.html
参考博客:https://blog.miigon.net/posts/s081-lab8-locks/
学习笔记记录,如有错误恳请各位大佬指正
Lab8: locks
更改数据结构和锁定策略以减少争用,提高并行性,提高性能
Memory allocator(moderate)
完整题目请在顶部链接查看
您的工作是实现每个CPU的空闲列表,并在CPU的空闲列表为空时进行窃取。所有锁的命名必须以“
kmem
”开头。也就是说,您应该为每个锁调用initlock
,并传递一个以“kmem
”开头的名称。运行kalloctest
以查看您的实现是否减少了锁争用。要检查它是否仍然可以分配所有内存,请运行usertests sbrkmuch
。您的输出将与下面所示的类似,在kmem
锁上的争用总数将大大减少,尽管具体的数字会有所不同。确保usertests
中的所有测试都通过。评分应该表明考试通过。
先看一下原本kalloc中的代码。原本代码中定义了一个结构体kmem
,将里面的freelist
字段作为空闲物理页作为链表项,使空闲页形成一个链表。分配物理页就是把freelist
从链表移除,释放物理页就是把要释放的页连回链表:
// kernel/kalloc.c
struct {struct spinlock lock;struct run *freelist;
} kmem;void
kfree(void *pa)
{struct run *r;if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)panic("kfree");// Fill with junk to catch dangling refs.memset(pa, 1, PGSIZE);r = (struct run*)pa;acquire(&kmem.lock);r->next = kmem.freelist;kmem.freelist = r;release(&kmem.lock);
}void *
kalloc(void)
{struct run *r;acquire(&kmem.lock);r = kmem.freelist; // 取出一个物理页。页表项本身就是物理页。if(r)kmem.freelist = r->next;release(&kmem.lock);if(r)memset((char*)r, 5, PGSIZE); // fill with junkreturn (void*)r;
}
分配和释放物理页都是操作共享数据,修改freelist链表,因此为了线程安全,这些操作都加上了锁。这样就导致了同一时刻只能有一个线程申请分配或释放内存,多线程没法并行执行这些操作,限制了并发效率。可以在实验题目中看到测试实例,kmem锁竞争很激烈:
$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 83375 #acquire() 433015
lock: bcache: #fetch-and-add 0 #acquire() 1260
--- top 5 contended locks:
lock: kmem: #fetch-and-add 83375 #acquire() 433015 //锁竞争最激烈
lock: proc: #fetch-and-add 23737 #acquire() 130718
lock: virtio_disk: #fetch-and-add 11159 #acquire() 114
lock: proc: #fetch-and-add 5937 #acquire() 130786
lock: proc: #fetch-and-add 4080 #acquire() 130786
tot= 83375
test1 FAIL
根据实验提示,可以为每一个CPU都声明一个freelist锁。CPU申请分配或释放内存,不会影响到另一个CPU执行相同的操作,因此给当前CPU上锁,不会影响到另一个CPU。代码如下(stealing_lock
下面解释):
struct {struct spinlock lock;struct spinlock stealing_lock; // 防止死锁问题,添加一个偷锁struct run* freelist;
} kmem[NCPU]; // 每个CPU分配独立的freelist,多个CPU并发分配物理内存不会相互竞争char* kmem_lock_names[] = {"kmem_cpu_0","kmem_cpu_1","kmem_cpu_2","kmem_cpu_3","kmem_cpu_4","kmem_cpu_5","kmem_cpu_6","kmem_cpu_7",
};void
kinit()
{for (int i = 0;i < NCPU;++i) {initlock(&kmem[i].lock, kmem_lock_names[i]);initlock(&kmem[i].stealing_lock, kmem_lock_names[i] + 's');}freerange(end, (void*)PHYSTOP);
}
相应的释放内存的代码:
void
kfree(void *pa)
{......r = (struct run*)pa;push_off();int cpu = cpuid(); // 获取cpu编号,中断关闭时调用cpuid才是安全的,所以上面用push_off关闭中断acquire(&kmem[cpu].lock); //将释放的页插入当前CPU的freelist中r->next = kmem[cpu].freelist;kmem[cpu].freelist = r;release(&kmem[cpu].lock);pop_off(); //重新打开中断
}
分配内存kalloc
的时候,可能会出现,在当前CPU已经没有freelist
的情况(该CPU空闲内存不足),需要从其他CPU偷内存页,涉及到共享数据的修改,所以分配内存时需要加锁,偷页的时候也要加上锁:
void *
kalloc(void)
{struct run *r;push_off(); //关闭中断int cpu = cpuid();acquire(&kmem[cpu].lock);if (!kmem[cpu].freelist) { // 当前CPU已经没有freelist的时候,去其他CPU偷内存页int steal_left = 64; // 这里指定偷64个内存页for (int i = 0;i < NCPU;++i) {if (i == cpu)continue; // 跳过当前CPUacquire(&kmem[i].lock);if (!kmem[i].freelist) { // 如果在想要偷页的cpu也没有freelist了,就释放锁跳过release(&kmem[i].lock);continue;}struct run* rr = kmem[i].freelist;while (rr && steal_left) { // 循环将kmem[i]的freelist移动到kmem[cpu]中kmem[i].freelist = rr->next;rr->next = kmem[cpu].freelist;kmem[cpu].freelist = rr;rr = kmem[i].freelist;steal_left--;}release(&kmem[i].lock);if (steal_left) // 偷到指定页数后退出循环break;}}r = kmem[cpu].freelist;if(r)kmem[cpu].freelist = r->next;release(&kmem[cpu].lock);pop_off(); //打开中断if (r)memset((char*)r, 5, PGSIZE); // fill with junkreturn (void*)r;
}
这里会有一个问题:cpu1在持有自身的锁的时候去cpu2偷页,此时cpu2也在持有自身锁的时候去cpu1偷页,造成死锁。
一种解决方案是,使用一个额外的锁stealing_lock
(就叫他偷锁了),在要去偷页的时候,先加上偷锁,然后释放自己的freelist锁。偷完之后,先加上freelist锁,再释放偷锁,因此上面的代码修改一下:
...for (int i = 0;i < NCPU;++i) {if (i == cpu)continue; // 跳过当前CPUacquire(&kmem[i].lock);if (!kmem[i].freelist) { // 如果在想要偷页的cpu也没有freelist了,就释放锁跳过release(&kmem[i].lock);continue;}acquire(&kmem[cpu].stealing_lock);release(&kmem[cpu].lock); // 加上偷锁,释放freelist锁,防止两个进程互相偷页时造成的死锁问题struct run* rr = kmem[i].freelist;while (rr && steal_left) { // 循环将kmem[i]的freelist移动到kmem[cpu]中kmem[i].freelist = rr->next;rr->next = kmem[cpu].freelist;kmem[cpu].freelist = rr;rr = kmem[i].freelist;steal_left--;}acquire(&kmem[cpu].lock);release(&kmem[cpu].stealing_lock);release(&kmem[i].lock);if (steal_left) // 偷到指定页数后退出循环break;}
...
在xv6当前设计下,一个进程只要还拿着至少一个锁,就不会被从当前cpu上调度走,而偷锁又是该cpu独享的。所以实际上只要某个偷锁被一个进程拿着,就不会有第二个进程能拿到这个偷锁,从而不会出现环路等待,从根本上消除死锁的可能性
现在可以验证实验是否正确
Buffer cache(hard)
修改块缓存,以便在运行
bcachetest
时,bcache(buffer cache的缩写)中所有锁的acquire
循环迭代次数接近于零。理想情况下,块缓存中涉及的所有锁的计数总和应为零,但只要总和小于500就可以。修改bget
和brelse
,以便bcache中不同块的并发查找和释放不太可能在锁上发生冲突(例如,不必全部等待bcache.lock
)。你必须保护每个块最多缓存一个副本的不变量。完成后,您的输出应该与下面显示的类似(尽管不完全相同)。确保usertests
仍然通过。完成后,make grade
应该通过所有测试。
该实验很多细节地方需要考虑,完整、专业的讲解还请看 参考博客 。该帖是按自己的理解整理归纳
bcache.lock用来保护高速缓存区的缓存块,多个进程不能同时操作磁盘缓存
bcache 中的区块缓存是会被多个进程、多个CPU共享的(多个进程可以同时访问同一个区块),所以不能模仿上一个实验,为每个CPU分配专属的块
static struct buf*
bget(uint dev, uint blockno)
{struct buf *b;acquire(&bcache.lock);// Is the block already cached?for(b = bcache.head.next; b != &bcache.head; b = b->next){if(b->dev == dev && b->blockno == blockno){b->refcnt++;release(&bcache.lock);acquiresleep(&b->lock);return b;}}// Not cached.// Recycle the least recently used (LRU) unused buffer.for(b = bcache.head.prev; b != &bcache.head; b = b->prev){if(b->refcnt == 0) {b->dev = dev;b->blockno = blockno;b->valid = 0;b->refcnt = 1;release(&bcache.lock);acquiresleep(&b->lock);return b;}}panic("bget: no buffers");
}
原本的设计中,当想要获取一个buf的时候,会给整个缓存区上锁,根据块号blockno
查找对应块是否已经缓存区,若在的话就讲块引用数+1,释放锁返回。不在的话,就遍历链表,将最近最久未使用的切引用数为0的buf(后面称为LRU-buf)作为缓存区块,释放锁返回该区块
改进方案为:建立一个由blockno
和dev
到buf的哈希表,通过一个特定的哈希公式映射到哈希桶,由此在每个哈希桶上加锁。这样,只有在两个进程同时访问的区块同时哈希到同一个桶的时候,才会发生锁竞争。当桶中的空闲 buf 不足的时候,从其他的桶中获取 buf
先定义哈希表相关:
// kernel/bio.c
// 哈希表中的桶号索引。根据提示,设置质数个桶可以降低哈希冲突的可能性
#define NBUFMAP_BUCKET 13
// 哈希索引
#define BUFMAP_HASH(dev, blockno) ((((dev)<<27)|(blockno))%NBUFMAP_BUCKET)
修改bcache结构体(驱逐锁后面解释),给每一个桶声明一个锁:
struct {
// struct spinlock lock;struct buf buf[NBUF];struct spinlock eviction_lock; // 驱逐锁// 哈希表struct buf bufmap[NBUFMAP_BUCKET];struct spinlock bufmap_locks[NBUFMAP_BUCKET]; // 桶锁
} bcache;
这时候的设计思路是:在bget
中,上当前的桶锁,索引到哈希表中看对应buf是否存在,存在就直接释放锁返回该块,不存在就在所有桶寻找一个LRU-buf,寻找的时候上对应的桶锁,找到后就释放这个桶锁,查询结束将该buf移出他原本所在的桶(称为缓存驱逐),然后加入到blockno对应的桶中,释放锁返回该地址。
这样会产生的问题如下(下面的描述会很抽象,因为这是我按自己理解概括了的 参考博客 的内容,省略了大量内容,如果不理解请去原博客学习)
1.查询时刻的buf和驱逐时刻的buf状态不一致。查询的时候该buf符合要求,但是查询完对应的桶之后会释放该桶锁,释放之后该buf就可以被其他进程访问了,在驱逐的时候就不一定符合要求了。
解决方法是,每一次查询桶都记录该桶的桶号,查询完该桶,如果找到新的LRU-buf就不释放该桶锁,直到驱逐完再解锁
2.两个进程查询桶时会造成环路死锁。如:CPU1在桶1中拿着1锁,要去桶2查询,CPU2在桶2拿着2锁,要去桶1查询,CPU1拿不到2锁,CPU2也拿不到1锁,就会造成死锁。
解决方法是,去查询其他桶之前,释放自己当前的桶锁,再去查询,查询结束后,再获取桶锁,把LRU-buf加入桶,这样就可以避免死锁
但是这样会引入新的问题:CPU检查blockno的buf是否存在缓存区,不在的话就释放桶锁去查询其他桶。因为释放了该桶的桶锁,其他CPU也会通过同样的blockno索引到相同的桶中,拿到桶锁,检查到buf不在缓存区,然后释放锁区查询桶,也就是同样的操作执行多次,会导致一个区块有多份缓存的情况。
原博客的解决方法是,牺牲一点效率保证安全:添加一个新的锁eviction_lock
,驱逐锁。释放桶锁后,加上驱逐锁(注意顺序),然后马上再次判断blockno的buf是否存在缓存区,确保不会创建重复的缓存buf。若不存在,就开始执行查询操作。最后把buf添加到桶后才释放锁
这样,即使有多个线程同时用同一个blockno访问同一个桶,并都检查到blockno的buf不在缓存区,也都只会有一个线程能拿到线程锁,然后去其他桶拿buf,把buf放到自己的桶后才释放驱逐锁,其他被驱逐锁卡住的线程拿到驱逐锁后也会先检查buf是否存在缓存区,这个时候可以查询到,直接释放锁返回。
这样做的好处:保证了查找过程中不会出现死锁,并且不会出现极端情况下一个块产生多个缓存的情况。坏处:驱逐锁相当于全局锁,使得原本可并发的遍历驱逐过程的并行性降低了。并且每一次 cache miss 的时候,都会多一次额外的桶遍历开销。
修改后的代码:
// kernel/buf.h
struct buf {int valid; // has data been read from disk?int disk; // does disk "own" buf?uint dev;uint blockno;struct sleeplock lock;uint refcnt;// struct buf *prev; // LRU cache liststruct buf *next;uchar data[BSIZE];uint lastuse; //用于跟踪LRU-buf
};
// kernel/bio.c
// 哈希表中的桶号索引
#define NBUFMAP_BUCKET 13
// 哈希索引
#define BUFMAP_HASH(dev, blockno) ((((dev)<<27)|(blockno))%NBUFMAP_BUCKET)struct {
// struct spinlock lock;struct buf buf[NBUF];struct spinlock eviction_lock; // 驱逐锁// 哈希表struct buf bufmap[NBUFMAP_BUCKET];struct spinlock bufmap_locks[NBUFMAP_BUCKET]; // 桶锁
} bcache;void
binit(void)
{// 初始化桶锁for (int i = 0;i < NBUFMAP_BUCKET;++i) {initlock(&bcache.bufmap_locks[i], "bcache_bufmap");bcache.bufmap[i].next = 0;}for (int i = 0;i < NBUF;++i) {// 初始化缓存区块struct buf* b = &bcache.buf[i];initsleeplock(&b->lock, "buffer");b->lastuse = 0;b->refcnt = 0;// 将所有缓存区块添加到bufmap[0]b->next = bcache.bufmap[0].next;bcache.bufmap[0].next = b;}initlock(&bcache.eviction_lock, "bcache_eviction");
}// Look through buffer cache for block on device dev.
static struct buf*
bget(uint dev, uint blockno)
{struct buf *b;uint key = BUFMAP_HASH(dev, blockno);acquire(&bcache.bufmap_locks[key]);// blockno的缓存区块是否已经在缓存区中for (b = bcache.bufmap[key].next;b;b = b->next) {if(b->dev == dev && b->blockno == blockno){b->refcnt++;release(&bcache.bufmap_locks[key]);acquiresleep(&b->lock);return b;}}// 不在缓存区// 为了防止死锁,先释放当前桶锁release(&bcache.bufmap_locks[key]);// 为了防止blocknod的缓存区块被重复创建,加上驱逐锁acquire(&bcache.eviction_lock);// 释放桶锁~加驱逐锁的间隙可能创建了blocknod的缓存区块,因此再检查一次for (b = bcache.bufmap[key].next;b;b = b->next) {if (b->dev == dev && b->blockno == blockno) {acquire(&bcache.bufmap_locks[key]); // 添加引用次数时必须加上桶锁b->refcnt++;release(&bcache.bufmap_locks[key]);release(&bcache.eviction_lock);acquiresleep(&b->lock);return b;}}// 仍然不在缓存区// 此时只持有驱逐锁,不持有任何桶锁。查询所有桶中的LRU-bufstruct buf* before_least = 0; // LRU-buf的前一个块uint holding_bucket = -1; //记录当前持有哪个桶锁// 循环查询所有桶for (int i = 0;i < NBUFMAP_BUCKET;++i) {acquire(&bcache.bufmap_locks[i]); // 获取当前遍历的桶锁(在找到下一个LRU-buf或驱逐内存之前都不释放)int newfound = 0; // 是否在当前桶找到的新的LRU-buffor (b = &bcache.bufmap[i];b->next;b = b->next) {if (b->next->refcnt == 0 && (!before_least || b->next->lastuse < before_least->next->lastuse)) {before_least = b;newfound = 1;}}if (!newfound) // 如果没找到找到新的LRU-buf,就释放当前的桶锁release(&bcache.bufmap_locks[i]);else { // 找到了新的LRU-bufif (holding_bucket != -1) // 如果当前找到的不是第一个LRU-buf,之前肯定持有某个桶锁,需要释放 release(&bcache.bufmap_locks[holding_bucket]);holding_bucket = i; // 把标记 holding_bucket 更改成当前桶锁编号}}// 如果没找到任何一个LRU-buf,表示没有空闲缓存块了if (!before_least)panic("bget: no buffuers");b = before_least->next; // b=LRU-bufif (holding_bucket != key) { // 想要偷的块如果不在key桶,就要把块从他所在的桶驱逐出来before_least->next = b->next;release(&bcache.bufmap_locks[holding_bucket]);//将LRU-buf添加到key桶acquire(&bcache.bufmap_locks[key]);b->next = bcache.bufmap[key].next;bcache.bufmap[key].next = b;}// 设置新buf的字段b->dev = dev;b->blockno = blockno;b->refcnt = 1;b->valid = 0;// 可以释放相关锁了release(&bcache.bufmap_locks[key]);release(&bcache.eviction_lock);acquiresleep(&b->lock);return b;
}void
brelse(struct buf *b)
{if(!holdingsleep(&b->lock))panic("brelse");releasesleep(&b->lock);uint key = BUFMAP_HASH(b->dev, b->blockno);acquire(&bcache.bufmap_locks[key]);b->refcnt--;if (b->refcnt == 0) {b->lastuse = ticks;}release(&bcache.bufmap_locks[key]);
}void
bpin(struct buf* b) {uint key = BUFMAP_HASH(b->dev, b->blockno);acquire(&bcache.bufmap_locks[key]);b->refcnt++;release(&bcache.bufmap_locks[key]);
}void
bunpin(struct buf* b) {uint key = BUFMAP_HASH(b->dev, b->blockno);acquire(&bcache.bufmap_locks[key]);b->refcnt--;release(&bcache.bufmap_locks[key]);
}
与原博客相比省略的细节:锁竞争优化思路、伪代码描述思路、死锁基础、悲观乐观锁、讨论区很多优质讨论。还是推荐去原博客学习,毕竟这只是我的学习记录__(:з」∠)_