Mit6.S081笔记Lab8: locks 锁

news/2024/11/13 21:35:54/文章来源:https://www.cnblogs.com/Amroning/p/18544863

课程地址:https://pdos.csail.mit.edu/6.S081/2020/schedule.html
Lab 地址:https://pdos.csail.mit.edu/6.S081/2020/labs/thread.html
我的代码地址:https://github.com/Amroning/MIT6.S081/tree/lock
xv6手册:https://pdos.csail.mit.edu/6.S081/2020/xv6/book-riscv-rev1.pdf
相关翻译:https://xv6.dgs.zone/labs/requirements/lab8.html
参考博客:https://blog.miigon.net/posts/s081-lab8-locks/

学习笔记记录,如有错误恳请各位大佬指正

Lab8: locks

更改数据结构和锁定策略以减少争用,提高并行性,提高性能

Memory allocator(moderate)

完整题目请在顶部链接查看

您的工作是实现每个CPU的空闲列表,并在CPU的空闲列表为空时进行窃取。所有锁的命名必须以“kmem”开头。也就是说,您应该为每个锁调用initlock,并传递一个以“kmem”开头的名称。运行kalloctest以查看您的实现是否减少了锁争用。要检查它是否仍然可以分配所有内存,请运行usertests sbrkmuch。您的输出将与下面所示的类似,在kmem锁上的争用总数将大大减少,尽管具体的数字会有所不同。确保usertests中的所有测试都通过。评分应该表明考试通过。

​ 先看一下原本kalloc中的代码。原本代码中定义了一个结构体kmem,将里面的freelist字段作为空闲物理页作为链表项,使空闲页形成一个链表。分配物理页就是把freelist从链表移除,释放物理页就是把要释放的页连回链表:

// kernel/kalloc.c
struct {struct spinlock lock;struct run *freelist;
} kmem;void
kfree(void *pa)
{struct run *r;if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)panic("kfree");// Fill with junk to catch dangling refs.memset(pa, 1, PGSIZE);r = (struct run*)pa;acquire(&kmem.lock);r->next = kmem.freelist;kmem.freelist = r;release(&kmem.lock);
}void *
kalloc(void)
{struct run *r;acquire(&kmem.lock);r = kmem.freelist; // 取出一个物理页。页表项本身就是物理页。if(r)kmem.freelist = r->next;release(&kmem.lock);if(r)memset((char*)r, 5, PGSIZE); // fill with junkreturn (void*)r;
}

​ 分配和释放物理页都是操作共享数据,修改freelist链表,因此为了线程安全,这些操作都加上了锁。这样就导致了同一时刻只能有一个线程申请分配或释放内存,多线程没法并行执行这些操作,限制了并发效率。可以在实验题目中看到测试实例,kmem锁竞争很激烈:

$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 83375 #acquire() 433015
lock: bcache: #fetch-and-add 0 #acquire() 1260
--- top 5 contended locks:
lock: kmem: #fetch-and-add 83375 #acquire() 433015    //锁竞争最激烈
lock: proc: #fetch-and-add 23737 #acquire() 130718
lock: virtio_disk: #fetch-and-add 11159 #acquire() 114
lock: proc: #fetch-and-add 5937 #acquire() 130786
lock: proc: #fetch-and-add 4080 #acquire() 130786
tot= 83375
test1 FAIL

​ 根据实验提示,可以为每一个CPU都声明一个freelist锁。CPU申请分配或释放内存,不会影响到另一个CPU执行相同的操作,因此给当前CPU上锁,不会影响到另一个CPU。代码如下(stealing_lock下面解释):

struct {struct spinlock lock;struct spinlock stealing_lock;  // 防止死锁问题,添加一个偷锁struct run* freelist;
} kmem[NCPU];                       // 每个CPU分配独立的freelist,多个CPU并发分配物理内存不会相互竞争char* kmem_lock_names[] = {"kmem_cpu_0","kmem_cpu_1","kmem_cpu_2","kmem_cpu_3","kmem_cpu_4","kmem_cpu_5","kmem_cpu_6","kmem_cpu_7",
};void
kinit()
{for (int i = 0;i < NCPU;++i) {initlock(&kmem[i].lock, kmem_lock_names[i]);initlock(&kmem[i].stealing_lock, kmem_lock_names[i] + 's');}freerange(end, (void*)PHYSTOP);
}

​ 相应的释放内存的代码:

void
kfree(void *pa)
{......r = (struct run*)pa;push_off();int cpu = cpuid();       // 获取cpu编号,中断关闭时调用cpuid才是安全的,所以上面用push_off关闭中断acquire(&kmem[cpu].lock);         //将释放的页插入当前CPU的freelist中r->next = kmem[cpu].freelist;kmem[cpu].freelist = r;release(&kmem[cpu].lock);pop_off();                //重新打开中断
}

​ 分配内存kalloc的时候,可能会出现,在当前CPU已经没有freelist的情况(该CPU空闲内存不足),需要从其他CPU偷内存页,涉及到共享数据的修改,所以分配内存时需要加锁,偷页的时候也要加上锁:

void *
kalloc(void)
{struct run *r;push_off();						//关闭中断int cpu = cpuid();acquire(&kmem[cpu].lock);if (!kmem[cpu].freelist) {        // 当前CPU已经没有freelist的时候,去其他CPU偷内存页int steal_left = 64;          // 这里指定偷64个内存页for (int i = 0;i < NCPU;++i) {if (i == cpu)continue;             // 跳过当前CPUacquire(&kmem[i].lock);if (!kmem[i].freelist) {      // 如果在想要偷页的cpu也没有freelist了,就释放锁跳过release(&kmem[i].lock);continue;}struct run* rr = kmem[i].freelist;while (rr && steal_left) {            // 循环将kmem[i]的freelist移动到kmem[cpu]中kmem[i].freelist = rr->next;rr->next = kmem[cpu].freelist;kmem[cpu].freelist = rr;rr = kmem[i].freelist;steal_left--;}release(&kmem[i].lock);if (steal_left)       // 偷到指定页数后退出循环break;}}r = kmem[cpu].freelist;if(r)kmem[cpu].freelist = r->next;release(&kmem[cpu].lock);pop_off();				//打开中断if (r)memset((char*)r, 5, PGSIZE); // fill with junkreturn (void*)r;
}

​ 这里会有一个问题:cpu1在持有自身的锁的时候去cpu2偷页,此时cpu2也在持有自身锁的时候去cpu1偷页,造成死锁。

​ 一种解决方案是,使用一个额外的锁stealing_lock(就叫他偷锁了),在要去偷页的时候,先加上偷锁,然后释放自己的freelist锁。偷完之后,先加上freelist锁,再释放偷锁,因此上面的代码修改一下:

...for (int i = 0;i < NCPU;++i) {if (i == cpu)continue;             // 跳过当前CPUacquire(&kmem[i].lock);if (!kmem[i].freelist) {      // 如果在想要偷页的cpu也没有freelist了,就释放锁跳过release(&kmem[i].lock);continue;}acquire(&kmem[cpu].stealing_lock);release(&kmem[cpu].lock);             // 加上偷锁,释放freelist锁,防止两个进程互相偷页时造成的死锁问题struct run* rr = kmem[i].freelist;while (rr && steal_left) {            // 循环将kmem[i]的freelist移动到kmem[cpu]中kmem[i].freelist = rr->next;rr->next = kmem[cpu].freelist;kmem[cpu].freelist = rr;rr = kmem[i].freelist;steal_left--;}acquire(&kmem[cpu].lock);release(&kmem[cpu].stealing_lock);release(&kmem[i].lock);if (steal_left)       // 偷到指定页数后退出循环break;}
...

​ 在xv6当前设计下,一个进程只要还拿着至少一个锁,就不会被从当前cpu上调度走,而偷锁又是该cpu独享的。所以实际上只要某个偷锁被一个进程拿着,就不会有第二个进程能拿到这个偷锁,从而不会出现环路等待,从根本上消除死锁的可能性

​ 现在可以验证实验是否正确

Buffer cache(hard)

修改块缓存,以便在运行bcachetest时,bcache(buffer cache的缩写)中所有锁的acquire循环迭代次数接近于零。理想情况下,块缓存中涉及的所有锁的计数总和应为零,但只要总和小于500就可以。修改bgetbrelse,以便bcache中不同块的并发查找和释放不太可能在锁上发生冲突(例如,不必全部等待bcache.lock)。你必须保护每个块最多缓存一个副本的不变量。完成后,您的输出应该与下面显示的类似(尽管不完全相同)。确保usertests仍然通过。完成后,make grade应该通过所有测试。

该实验很多细节地方需要考虑,完整、专业的讲解还请看 参考博客 。该帖是按自己的理解整理归纳

​ bcache.lock用来保护高速缓存区的缓存块,多个进程不能同时操作磁盘缓存

​ bcache 中的区块缓存是会被多个进程、多个CPU共享的(多个进程可以同时访问同一个区块),所以不能模仿上一个实验,为每个CPU分配专属的块

static struct buf*
bget(uint dev, uint blockno)
{struct buf *b;acquire(&bcache.lock);// Is the block already cached?for(b = bcache.head.next; b != &bcache.head; b = b->next){if(b->dev == dev && b->blockno == blockno){b->refcnt++;release(&bcache.lock);acquiresleep(&b->lock);return b;}}// Not cached.// Recycle the least recently used (LRU) unused buffer.for(b = bcache.head.prev; b != &bcache.head; b = b->prev){if(b->refcnt == 0) {b->dev = dev;b->blockno = blockno;b->valid = 0;b->refcnt = 1;release(&bcache.lock);acquiresleep(&b->lock);return b;}}panic("bget: no buffers");
}

​ 原本的设计中,当想要获取一个buf的时候,会给整个缓存区上锁,根据块号blockno查找对应块是否已经缓存区,若在的话就讲块引用数+1,释放锁返回。不在的话,就遍历链表,将最近最久未使用的切引用数为0的buf(后面称为LRU-buf)作为缓存区块,释放锁返回该区块

​ 改进方案为:建立一个由blocknodev到buf的哈希表,通过一个特定的哈希公式映射到哈希桶,由此在每个哈希桶上加锁。这样,只有在两个进程同时访问的区块同时哈希到同一个桶的时候,才会发生锁竞争。当桶中的空闲 buf 不足的时候,从其他的桶中获取 buf

​ 先定义哈希表相关:

// kernel/bio.c
// 哈希表中的桶号索引。根据提示,设置质数个桶可以降低哈希冲突的可能性
#define NBUFMAP_BUCKET 13
// 哈希索引
#define BUFMAP_HASH(dev, blockno) ((((dev)<<27)|(blockno))%NBUFMAP_BUCKET)

​ 修改bcache结构体(驱逐锁后面解释),给每一个桶声明一个锁:

struct {
//   struct spinlock lock;struct buf buf[NBUF];struct spinlock eviction_lock;        // 驱逐锁// 哈希表struct buf bufmap[NBUFMAP_BUCKET];struct spinlock bufmap_locks[NBUFMAP_BUCKET];     // 桶锁
} bcache;

​ 这时候的设计思路是:在bget中,上当前的桶锁,索引到哈希表中看对应buf是否存在,存在就直接释放锁返回该块,不存在就在所有桶寻找一个LRU-buf,寻找的时候上对应的桶锁,找到后就释放这个桶锁,查询结束将该buf移出他原本所在的桶(称为缓存驱逐),然后加入到blockno对应的桶中,释放锁返回该地址。

​ 这样会产生的问题如下(下面的描述会很抽象,因为这是我按自己理解概括了的 参考博客 的内容,省略了大量内容,如果不理解请去原博客学习)

1.查询时刻的buf和驱逐时刻的buf状态不一致。查询的时候该buf符合要求,但是查询完对应的桶之后会释放该桶锁,释放之后该buf就可以被其他进程访问了,在驱逐的时候就不一定符合要求了。

​ 解决方法是,每一次查询桶都记录该桶的桶号,查询完该桶,如果找到新的LRU-buf就不释放该桶锁,直到驱逐完再解锁

2.两个进程查询桶时会造成环路死锁。如:CPU1在桶1中拿着1锁,要去桶2查询,CPU2在桶2拿着2锁,要去桶1查询,CPU1拿不到2锁,CPU2也拿不到1锁,就会造成死锁。

​ 解决方法是,去查询其他桶之前,释放自己当前的桶锁,再去查询,查询结束后,再获取桶锁,把LRU-buf加入桶,这样就可以避免死锁

​ 但是这样会引入新的问题:CPU检查blockno的buf是否存在缓存区,不在的话就释放桶锁去查询其他桶。因为释放了该桶的桶锁,其他CPU也会通过同样的blockno索引到相同的桶中,拿到桶锁,检查到buf不在缓存区,然后释放锁区查询桶,也就是同样的操作执行多次,会导致一个区块有多份缓存的情况。

​ 原博客的解决方法是,牺牲一点效率保证安全:添加一个新的锁eviction_lock,驱逐锁。释放桶锁后,加上驱逐锁(注意顺序),然后马上再次判断blockno的buf是否存在缓存区,确保不会创建重复的缓存buf。若不存在,就开始执行查询操作。最后把buf添加到桶后才释放锁

​ 这样,即使有多个线程同时用同一个blockno访问同一个桶,并都检查到blockno的buf不在缓存区,也都只会有一个线程能拿到线程锁,然后去其他桶拿buf,把buf放到自己的桶后才释放驱逐锁,其他被驱逐锁卡住的线程拿到驱逐锁后也会先检查buf是否存在缓存区,这个时候可以查询到,直接释放锁返回。

​ 这样做的好处:保证了查找过程中不会出现死锁,并且不会出现极端情况下一个块产生多个缓存的情况。坏处:驱逐锁相当于全局锁,使得原本可并发的遍历驱逐过程的并行性降低了。并且每一次 cache miss 的时候,都会多一次额外的桶遍历开销。

修改后的代码:

// kernel/buf.h
struct buf {int valid;   // has data been read from disk?int disk;    // does disk "own" buf?uint dev;uint blockno;struct sleeplock lock;uint refcnt;// struct buf *prev; // LRU cache liststruct buf *next;uchar data[BSIZE];uint lastuse;     //用于跟踪LRU-buf
};
// kernel/bio.c
// 哈希表中的桶号索引
#define NBUFMAP_BUCKET 13
// 哈希索引
#define BUFMAP_HASH(dev, blockno) ((((dev)<<27)|(blockno))%NBUFMAP_BUCKET)struct {
//   struct spinlock lock;struct buf buf[NBUF];struct spinlock eviction_lock;        // 驱逐锁// 哈希表struct buf bufmap[NBUFMAP_BUCKET];struct spinlock bufmap_locks[NBUFMAP_BUCKET];     // 桶锁
} bcache;void
binit(void)
{// 初始化桶锁for (int i = 0;i < NBUFMAP_BUCKET;++i) {initlock(&bcache.bufmap_locks[i], "bcache_bufmap");bcache.bufmap[i].next = 0;}for (int i = 0;i < NBUF;++i) {// 初始化缓存区块struct buf* b = &bcache.buf[i];initsleeplock(&b->lock, "buffer");b->lastuse = 0;b->refcnt = 0;// 将所有缓存区块添加到bufmap[0]b->next = bcache.bufmap[0].next;bcache.bufmap[0].next = b;}initlock(&bcache.eviction_lock, "bcache_eviction");
}// Look through buffer cache for block on device dev.
static struct buf*
bget(uint dev, uint blockno)
{struct buf *b;uint key = BUFMAP_HASH(dev, blockno);acquire(&bcache.bufmap_locks[key]);// blockno的缓存区块是否已经在缓存区中for (b = bcache.bufmap[key].next;b;b = b->next) {if(b->dev == dev && b->blockno == blockno){b->refcnt++;release(&bcache.bufmap_locks[key]);acquiresleep(&b->lock);return b;}}// 不在缓存区// 为了防止死锁,先释放当前桶锁release(&bcache.bufmap_locks[key]);// 为了防止blocknod的缓存区块被重复创建,加上驱逐锁acquire(&bcache.eviction_lock);// 释放桶锁~加驱逐锁的间隙可能创建了blocknod的缓存区块,因此再检查一次for (b = bcache.bufmap[key].next;b;b = b->next) {if (b->dev == dev && b->blockno == blockno) {acquire(&bcache.bufmap_locks[key]);     // 添加引用次数时必须加上桶锁b->refcnt++;release(&bcache.bufmap_locks[key]);release(&bcache.eviction_lock);acquiresleep(&b->lock);return b;}}// 仍然不在缓存区// 此时只持有驱逐锁,不持有任何桶锁。查询所有桶中的LRU-bufstruct buf* before_least = 0;     // LRU-buf的前一个块uint holding_bucket = -1;         //记录当前持有哪个桶锁// 循环查询所有桶for (int i = 0;i < NBUFMAP_BUCKET;++i) {acquire(&bcache.bufmap_locks[i]);     // 获取当前遍历的桶锁(在找到下一个LRU-buf或驱逐内存之前都不释放)int newfound = 0;     // 是否在当前桶找到的新的LRU-buffor (b = &bcache.bufmap[i];b->next;b = b->next) {if (b->next->refcnt == 0 && (!before_least || b->next->lastuse < before_least->next->lastuse)) {before_least = b;newfound = 1;}}if (!newfound)                            // 如果没找到找到新的LRU-buf,就释放当前的桶锁release(&bcache.bufmap_locks[i]);else {                                                    // 找到了新的LRU-bufif (holding_bucket != -1)                             // 如果当前找到的不是第一个LRU-buf,之前肯定持有某个桶锁,需要释放  release(&bcache.bufmap_locks[holding_bucket]);holding_bucket = i;                                   // 把标记 holding_bucket 更改成当前桶锁编号}}// 如果没找到任何一个LRU-buf,表示没有空闲缓存块了if (!before_least)panic("bget: no buffuers");b = before_least->next;           // b=LRU-bufif (holding_bucket != key) {      // 想要偷的块如果不在key桶,就要把块从他所在的桶驱逐出来before_least->next = b->next;release(&bcache.bufmap_locks[holding_bucket]);//将LRU-buf添加到key桶acquire(&bcache.bufmap_locks[key]);b->next = bcache.bufmap[key].next;bcache.bufmap[key].next = b;}// 设置新buf的字段b->dev = dev;b->blockno = blockno;b->refcnt = 1;b->valid = 0;// 可以释放相关锁了release(&bcache.bufmap_locks[key]);release(&bcache.eviction_lock);acquiresleep(&b->lock);return b;
}void
brelse(struct buf *b)
{if(!holdingsleep(&b->lock))panic("brelse");releasesleep(&b->lock);uint key = BUFMAP_HASH(b->dev, b->blockno);acquire(&bcache.bufmap_locks[key]);b->refcnt--;if (b->refcnt == 0) {b->lastuse = ticks;}release(&bcache.bufmap_locks[key]);
}void
bpin(struct buf* b) {uint key = BUFMAP_HASH(b->dev, b->blockno);acquire(&bcache.bufmap_locks[key]);b->refcnt++;release(&bcache.bufmap_locks[key]);
}void
bunpin(struct buf* b) {uint key = BUFMAP_HASH(b->dev, b->blockno);acquire(&bcache.bufmap_locks[key]);b->refcnt--;release(&bcache.bufmap_locks[key]);
}

与原博客相比省略的细节:锁竞争优化思路伪代码描述思路死锁基础悲观乐观锁讨论区很多优质讨论。还是推荐去原博客学习,毕竟这只是我的学习记录__(:з」∠)_

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/833156.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于HASM模型的高精度建模matlab仿真

1.程序功能描述 本课题主要使用HASM进行高精度建模,主要对HASM模型进行介绍以及在实际中如何进行简化实现的。HASM原始的模型如下所示: 2.测试软件版本以及运行结果展示MATLAB2022A版本运行 3.核心程序%第一类基本变量E(i,j) = 1 + (( f(i,j+1,n) - f(i,j-1,n) )/( …

CICD04 Jenkins容器化CICD实现及分布式构建, 流水线Pipeline ubuntu使用

2.14.3 案例: 基于 Docker 插件实现自由风格任务实现 Docker 镜像 制作 不如前面的直接脚本编写灵活 2.14.3.2 安装插件 docker-build-step jenkins上安装 docker-build-step 插件#选择jenkins使用的docker服务 #左侧系统管理,右侧系统配置,Docker Builder下Docker URL输入 u…

数据类型和运算符

数据类型 动态类型编程语言运行时判断静态类型的编程语言 : Go 、C 、在开发的时候,就需要给一些定义的变量赋值空间大小。C 需要自己去开辟这个空间数据类型 : 每种在Go语言中出现的基本数据类型,会有一个默认的空间大小。 1、布尔类型数据 布尔型的值只可以是常量 true 或…

XXL JOB DockerCompose部署

官网给的方式是 Docker 命令启动,但是用起来太麻烦了,所以用DockerCompose 简化部署 创建数据库,导入 SQL SQL 脚本位置为/xxl-job/doc/db/tables_xxl_job.sql https://raw.githubusercontent.com/xuxueli/xxl-job/refs/heads/master/doc/db/tables_xxl_job.sql 编写 Docker…

CICD02 Jenkins安装,备份还原, 实现CICD核心功能 ubuntu使用

DevOps 之 CICD 服务器 Jenkins 1 Jenkins 部署与基本配置 1.2 Jenkins 安装和启动 1.2.1 Jenkins 的安装 Jenkins支持多种安装方法 1.包安装 2.JAVA的WAR文件 #要手动配置,不太方便 3.容器运行#系统要求 最低推荐配置:1.256MB可用内存2.1GB可用磁盘空间(作为一个Docker容…

CICD01 Git, GitLab, 部署方式 ubuntu使用

版本管理系统 Git 和 GitLab 1 DevOps 简介 1.3 持续集成、持续交付和持续部署 CICD CICD: 持续集成, 持续交付, 持续部署 1.6 常见的软件部署模式 生产中 蓝绿部署 和 金丝雀用的比较多 1.6.1 蓝绿部署 Blue-green Deployments 一个和生产环境一样的预发布环境, 和生产环…

jvm 垃圾回收算法的评价标准

如何实现回收的(核心思想): 1. 找到内存中存活的对象(与GC Root相关联) 2. 释放不再存活对象的内存,使得程序能再次利用这部分空间 --------------------------------------------------------------------------------- 垃圾回收算法的分类: -------- ----------------…

GO面试-切片

一、结构介绍 切片(Slice)在 Go 语言中,有一个很常用的数据结构,切片是一个拥有相同类型元素的可变长度的序列,它是基于数组类型做的一层封装。它非常灵活,支持自动扩容。并发不安全。 切片是一种引用类型,它有三个属性:指针,长度和容量。 底层源码定义: type slice …

系统管理体系——软件包管理

1.Linux系统管理体系——软件包管理Linux下面的软件包格式为:rpm格式(红帽系列系统,CentOS,麒麟系统)或deb格式(Debian,Ubuntu)安装软件方式 举例 说明 应用场景yum/apt 方式 点外卖,缺啥少啥,外卖解 决 通过网络下载软件包,替我们安装, 如果 有依赖自动下载依赖并安装. …

Linux12位权限管理体

1. Linux12位权限管理体 1.1 权限管理概述Linux通过rwx3种权限控制系统与保护系统,组成9位权限. Linux权限体系中还有3位特殊权限,组合起来就是12位权限体系. Linux这简单的rwx控制整个Linux系统的安全,权限与用户共同组成Linux系统的安全防护体系.1.2 Linux权限计算 2.0 rwx权…

Java流程控制(三)

用户交互Scanner(java.util.Scanner获取用户的输入)//基本语法 Scanner s = new Scanner(System.in)通过Scanner类的next()与nextLine()方法获取输入的字符串,使用hasNext()与hasNextLine()判断是否还有输入的数据(Next不能得到带有空格的字符串,NextLine可以获得空白)im…

0.1+0.2=0.30000000000000004

看下效果这个网站能找到你想要的答案 https://0.30000000000000004.com/ 十进制转二进制 十进制整数转换为二进制整数采用"除2取余,逆序排列"法。 具体做法是:用2整除十进制整数,可以得到一个商和余数; 再用2去除商,又会得到一个商和余数,如此进行,直到商为小…