本次 lab 是要从头规划 xv6 的内存分配器和块缓存来获取更高的并行性。要添加并行性通常要规划到更改数据结构和上锁策略来削减竞态。
Memory allocator (moderate)
本来的 allocator 中只要一个独自的 freelist,也便是说 page 的分配和开释都会频繁的发生竞赛。所以给每个 CPU 设置一个独自的 freelist 和一个独自的锁。所以进程在不同的 CPU 上请求和分配 page 就能够并行运行了。假如当时 CPU 上 freelist 的为空的时分,就要去其他 CPU 那里盗取 page,这个时分依旧会发生竞赛。
代码完成
首要修正 CPU 结构体,在每个 CPU 中参加一个之前的 kmem,让每个 CPU 能够独自的分配内存。
struct run {
struct run *next;
};
struct kmem {
struct spinlock lock;
struct run *freelist;
};
// Per-CPU state.
struct cpu {
struct proc *proc; // The process running on this cpu, or null.
struct context context; // swtch() here to enter scheduler().
struct kmem kmem;
int noff; // Depth of push_off() nesting.
int intena; // Were interrupts enabled before push_off()?
};
经过 NCPU 参数能够知道,有 8 个 CPU,所以在初始化 alloctor 的时分将 freepage 分红 8 份,别离放入 8 个 CPU 的 kmem 中。其实也能够初始化的时分都放到一个 CPU 里,然后其它 CPU 去偷。可是我感觉这样会有一个冷启动的进程,不如在初始化的时分直接先分配了。
void
kinit()
{
uint64 offset = (PHYSTOP - (uint64)end)/8;
uint64 left, right;
left = (uint64) end;
right = left + offset;
for (int i = 0; i < NCPU; i++) {
struct cpu *c = &cpus[i];
initlock(&c->kmem.lock, "kmem");
char *p;
p = (char*)PGROUNDUP(left);
for(; p + PGSIZE <= (char*)right; p += PGSIZE) {
struct run *r;
if(((uint64)p % PGSIZE) != 0 || (char*)p < end || (uint64)p >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(p, 1, PGSIZE);
r = (struct run*)p;
r->next = c->kmem.freelist;
c->kmem.freelist = r;
}
left = right;
right = right + offset;
}
}
然后修正 kfree,在开释的时分请求当时 CPU 的 kmem 锁,将 page 开释到当时 CPU 的 freelist 中即可,留意开关中止。
void
kfree(void *pa)
{
struct run *r;
if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
r = (struct run*)pa;
push_off();
struct cpu *c = mycpu();
acquire(&c->kmem.lock);
r->next = c->kmem.freelist;
c->kmem.freelist = r;
release(&c->kmem.lock);
pop_off();
}
最终是 kalloc,在分配的时分因为当时 CPU 的 freelist 或许现已为空了,那么就需要去其它 CPU 那里偷。
void *
kalloc()
{
push_off();
struct run *r;
int cid = cpuid();
struct cpu *c = &cpus[cid];
acquire(&c->kmem.lock);
r = c->kmem.freelist;
if(r) {
c->kmem.freelist = r->next;
release(&c->kmem.lock);
} else {
release(&c->kmem.lock);
for (int i = 0; i < NCPU; i++) {
if (i == cid) continue;
struct cpu *nc = &cpus[i];
acquire(&nc->kmem.lock);
r = nc->kmem.freelist;
if (r) {
acquire(&c->kmem.lock);
nc->kmem.freelist = r->next;
release(&c->kmem.lock);
release(&nc->kmem.lock);
break;
} else {
release(&nc->kmem.lock);
continue;
}
}
}
if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
pop_off();
return (void*)r;
}
去其它 CPU 那里偷的时分,因为要获取其它 CPU 的锁,所以会发生竞赛。还有一种状况便是当时进程持有当时 CPU 的锁去偷其它 CPU 的时分,那个被偷的 CPU 或许也正在测验偷当时 CPU,这样就会发生死锁。
能够选择在去其他 CPU 那里偷之前把当时 CPU 的锁开释掉,这样就不会发生死锁,破坏了死锁的必要条件。我在大佬的博客上看到说假如这个时分开释了锁,就会导致重复盗取 page。我想了一下,假如说在开释当时 CPU 的 kmem 锁之后,该进程被调度走了,另外一个进程又过来履行 kalloc,也发现当时 CPU 的 freelist 是空的,也会进行盗取。可是我感觉当时现已是关中止了,那么时钟中止也会被屏蔽,当时进程就不会被调度了,所以应该不会发生这种状况吧。这一块没有太懂,不过能经过测验,暂时先这样。
Buffer cache (hard)
这部分要做的事情跟榜首部分的意图是相同的,修正 bcache 结构体,并降低锁的粒度。将本来的用一个大锁锁住整个双向链表的规划,拆分红一个 Hash Table,每个 bucket 一个锁,那么就能够使得拜访不同 bucket 的进程并行拜访 buffer cache。
代码完成
依据 hints,最基本的思路是很简单想到的。将 buffer 以哈希表的形式安排,bucket 的数量为 13 个,每个 bucket 需要一把锁。而且把本来的双向链表规划去掉,本来的双向链表是为了完成 LRU,但现在咱们经过给每个 buffer 记载时刻戳来完成 LRU,时刻戳就为 kernel/trap.c 中的 ticks。
所以 bcache 结构改为以下形式:
struct {
struct spinlock lock;
struct buf buf[NBUF];
struct buf buckets[NBUCKET];
struct spinlock bucketlocks[NBUCKET];
} bcache;
在 buf 结构体中去掉 prev 指针,并参加 timestamp 字段,用于完成 LRU:
struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
uint timestamp;
struct buf *prev; // LRU cache list
struct buf *next;
uchar data[BSIZE];
};
这样一来就不需要保护双向链表了,binit 的完成也很简单,只需要初始化锁而且将 bcache.buf 中的所有 buffer 都放进 bcache.buckets[0],并初始化每个 buf 的 sleeplock 即可。
void
binit(void)
{
struct buf *b;
initlock(&bcache.lock, "bcache");
int i;
for (i = 0; i < NBUCKET; i++) {
initlock(&bcache.bucketlocks[i], "bcache");
}
for (b = bcache.buf; b < bcache.buf+NBUF; b++) {
initsleeplock(&b->lock, "buffer");
b->next = bcache.buckets[0].next;
bcache.buckets[0].next = b;
}
}
在我最开端的完成中,bcache.lock 是用不上的(所以也没跑出正确答案)。可是依据 Hints:
Your solution might need to hold two locks in some cases; for example, during eviction you may need to hold the bcache lock and a lock per bucket. Make sure you avoid deadlock.
是需要上两把锁的,具体运用场景到后边再看。
因为咱们现在不需要保护双向链表,在 brelse 的时分直接开释 buf 的 sleep 的 lock,然后再获取对应的 bucket 的 lock,将 refcnt– 即可,假如 refcnt == 0,则将当时的 ticks 设置为 buf 的 timestamp。
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
int key = HASH_DEV_BLOCKNO(b->dev, b->blockno);
acquire(&bcache.bucketlocks[key]);
b->refcnt--;
if (b->refcnt == 0) {
b->timestamp = ticks;
}
release(&bcache.bucketlocks[key]);
}
相同的修正 bpin 和 bunpin,这儿就不贴出来了。
bget
整个 bget 是最摧残的当地,多线程场景下的问题仍是太难发现了。
最开端我没有意识到问题的严重性,按照最初的思路,写了榜首版代码,逻辑如下:
- 获取 dev 和 blockno 对应的 bucket 的锁;
- 假如现已有缓存了就直接回来 buf,不然履行 3;
- 在所有的 bucket 中找出一个 refcnt 为 0,而且最近最久未运用的 buf。在这个进程中需要对 bucket 上锁,遍历完一个 bucket 之后,开释它的锁;
- 假如找到了这样的 buf,就请求它地点的 bucket 锁并进行 eviction,将它从本来的 bucket 中移除,不然直接 panic;
- 将找到的 buf 刺进 dev 和 blockno 对应的 bucket 中,并设置 buf 的值;
- 开释 dev 和 blockno 对应的 bucket 的锁;
- 获取找到的 buf 的 sleeplock;
- 回来 buf;
这样乍一看是没什么问题,可是事实并非如此。
首要一个简单留意到的点便是,会发生死锁。因为咱们在发现没有缓存的时分,并没有开释刚刚获取的 bucket 的锁,然后就开端在所有 bucket 中找一个能够被 evict 的 buf,这时分要获取其它 bucket 的锁。那么假如一个进程 A 持有 bucket1 的锁,又去获取 bucket 2 的锁,可是进程 B 又持有 bucket2 的锁去获取 bucket1 的锁时,就发生了死锁。
所以在发现咱们要找的 buf 并没有在缓存中时,要先开释当时持有的 bucket 的锁,再去进行 eviction。
第二个问题便是,当咱们找到了那个能够被 evict 的 buf 时,将锁开释掉了,正式进行 evict 时,又去请求锁,可是在开释锁到从头请求锁的这个空隙,或许有其它进程又引证了刚刚找出来的那个 buf,使得它的 refcnt 不为 1,这个时分将其 evict 掉就会发生过错。
处理方案便是,在找出能够 evict 的 buf 后,不开释对应 bucket 的锁,而是直到 evict 之后再开释。那么其它进程在刚开端获取缓存的时分就会堵塞,因为它获取不到这个 bucket 的锁,待到 ecivt 完毕后,它才能去检查 bucket,这时分它就看不到那个被 evict 的 buf 了。
完成以上两个修正之后,bcachetest 现已能够经过了。可是假如履行 usertests,榜首个 manywrites 就无法经过,报的过错是 panic: freeing free block,即开释了一个本来现已开释的 block cache。而这种重复开释的原因,必定便是同一个 block 被缓存了屡次。
下面的思路来自于Miigon。
假定当时有两个进程一起拜访相同缓存块,在榜首个进程获取到对应的 bucket 锁后,发现不存在缓存,就开释了锁,进入寻觅可 evict 的 buf 阶段。这时,第二个进程相同能够获取对应 bucket 的锁,并相同发现缓存不存在,也进入寻觅可 evict 的 buf 阶段。
这个时分完全有或许发生的是,它们找到了两个不同的 buf,而且将它们 evict 掉之后都刺进了它们要找的 block 对应的 bucket 中。这时就呈现了同一块 block 有多个 cache 的状况。最终也就会触发 freeing free block。
这时分前文说到的那个 hint 就要回收了,再看一遍:
Your solution might need to hold two locks in some cases; for example, during eviction you may need to hold the bcache lock and a lock per bucket. Make sure you avoid deadlock.
再结合前一条:
It is OK to serialize eviction in bget (i.e., the part of bget that selects a buffer to re-use when a lookup misses in the cache).
现在想想其实便是暗示咱们在寻觅并 evict 可用 buf 的时分将整个流程串行化,保存最开端检查是否有缓存的并行。而且缓存丢掉的概率一般来说都是非常低的,所以后边的串行化形成的性能损失其实是能够承受的。
所以现在利用上 bcache.lock,在发现缓存不存在开释掉对应 bucket 的锁之后,马上获取 bcache.lock 这把大锁。因为在开释对应 bucket 锁到获取 bcache.lock 期间,或许有其他进程现已完成了对咱们要找的 block 设置 cache,所以在获取完 bcache.lock 后,再进行一次查找缓存,假如发现现已存在了,就直接回来对应的 buf。
这样一来,就算有多个进程一起进入 bget,也只要榜首个进程能够获取到 bcache.lock,并完成缓存的设置,后边的进程都会被获取完锁后的第二轮查找缓存给拦住。这样就避免了一个 block 被缓存屡次。
下面是完整的代码:
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;
int key = HASH_DEV_BLOCKNO(dev, blockno);
acquire(&bcache.bucketlocks[key]);
// Is the block already cached?
for (b = bcache.buckets[key].next; b; b = b->next) {
if (b->dev == dev && b->blockno == blockno) {
b->refcnt++;
release(&bcache.bucketlocks[key]);
acquiresleep(&b->lock);
return b;
}
}
release(&bcache.bucketlocks[key]);
// 一定要先开释再去获取 bcache.lock
// 不然假如另一个进程持有 bcache.lock,再在下面获取 bucketlocks[key] 就会死锁
acquire(&bcache.lock);
for (b = bcache.buckets[key].next; b; b = b->next) {
if (b->dev == dev && b->blockno == blockno) {
acquire(&bcache.bucketlocks[key]);
b->refcnt++;
release(&bcache.bucketlocks[key]);
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}
}
// Not cached.
int i, bidx = -1, flag = 0;
struct buf *tmp = 0;
for (i = 0; i < NBUCKET; i++) {
acquire(&bcache.bucketlocks[i]);
for (b = &bcache.buckets[i]; b->next; b = b->next) {
if (b->next->refcnt == 0 && (!tmp || b->next->timestamp > tmp->next->timestamp)) {
tmp = b;
flag = 1;
}
}
if (!flag) {
release(&bcache.bucketlocks[i]); // 假如没有找到 buf 就开释锁
} else {
// 假如找到了新 buf,那就开释之前找到的 buf 的 bucket 锁,并保存当时 bucket 的锁
flag = 0;
if (bidx != -1) release(&bcache.bucketlocks[bidx]);
bidx = i;
}
}
if (!tmp) {
panic("bget: no buffers");
}
b = tmp->next;
tmp->next = b->next;
release(&bcache.bucketlocks[bidx]); // 进行 evict 后再开释锁
acquire(&bcache.bucketlocks[key]);
b->next = bcache.buckets[key].next;
bcache.buckets[key].next = b;
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
release(&bcache.bucketlocks[key]);
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}
测验结果
总结
多线程场景下的问题真的很难搞,感觉在事务场景下,用一些常见的形式还没有那么难。到系统编程的场景下,这些问题都要自己考虑,处理一个问题之后或许又发生另一个问题,我自己对于这方面的能力仍是太弱了。
引证一下Miigon大佬的总结:
锁竞赛优化一般有几个思路:
- 只在有必要同享的时分同享(对应为将资源从 CPU 同享拆分为每个 CPU 独立)
- 有必要同享时,尽量削减在要害区中停留的时刻(对应“大锁化小锁”,降低锁的粒度)
榜首种思路便是本次 lab 的榜首部分,只要当时 CPU 的 freelist 现已为空时,才去和其它的 CPU 同享 freelist,其它状况下都是并行履行的。
第二种思路便是本次 lab 的第二部分,bcache 是没有办法独自划分给每个 CPU 的,属于有必要同享,所以只能经过缩小临界区,缩小锁的粒度来完成。
后续这方面仍是得多多加强。