本次 lab 是要从头规划 xv6 的内存分配器和块缓存来获取更高的并行性。要添加并行性通常要规划到更改数据结构和上锁策略来削减竞态。

Memory allocator (moderate)

本来的 allocator 中只要一个独自的 freelist,也便是说 page 的分配和开释都会频繁的发生竞赛。所以给每个 CPU 设置一个独自的 freelist 和一个独自的锁。所以进程在不同的 CPU 上请求和分配 page 就能够并行运行了。假如当时 CPU 上 freelist 的为空的时分,就要去其他 CPU 那里盗取 page,这个时分依旧会发生竞赛。

代码完成

首要修正 CPU 结构体,在每个 CPU 中参加一个之前的 kmem,让每个 CPU 能够独自的分配内存。

struct run {
  struct run *next;
};
struct kmem {
  struct spinlock lock;
  struct run *freelist;
};
// Per-CPU state.
struct cpu {
  struct proc *proc;          // The process running on this cpu, or null.
  struct context context;     // swtch() here to enter scheduler().
  struct kmem kmem;
  int noff;                   // Depth of push_off() nesting.
  int intena;                 // Were interrupts enabled before push_off()?
};

经过 NCPU 参数能够知道,有 8 个 CPU,所以在初始化 alloctor 的时分将 freepage 分红 8 份,别离放入 8 个 CPU 的 kmem 中。其实也能够初始化的时分都放到一个 CPU 里,然后其它 CPU 去偷。可是我感觉这样会有一个冷启动的进程,不如在初始化的时分直接先分配了。

void
kinit()
{
  uint64 offset = (PHYSTOP - (uint64)end)/8;
  uint64 left, right;
  left = (uint64) end;
  right = left + offset;
  for (int i = 0; i < NCPU; i++) {
    struct cpu *c = &cpus[i];
    initlock(&c->kmem.lock, "kmem");
    char *p;
    p = (char*)PGROUNDUP(left);
    for(; p + PGSIZE <= (char*)right; p += PGSIZE) {
      struct run *r;
      if(((uint64)p % PGSIZE) != 0 || (char*)p < end || (uint64)p >= PHYSTOP)
        panic("kfree");
      // Fill with junk to catch dangling refs.
      memset(p, 1, PGSIZE);
      r = (struct run*)p;
      r->next = c->kmem.freelist;
      c->kmem.freelist = r;
    }
    left = right;
    right = right + offset;
  }
}

然后修正 kfree,在开释的时分请求当时 CPU 的 kmem 锁,将 page 开释到当时 CPU 的 freelist 中即可,留意开关中止。

void
kfree(void *pa)
{
  struct run *r;
  if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
    panic("kfree");
  // Fill with junk to catch dangling refs.
  memset(pa, 1, PGSIZE);
  r = (struct run*)pa;
  push_off();
  struct cpu *c = mycpu();
  acquire(&c->kmem.lock);
  r->next = c->kmem.freelist;
  c->kmem.freelist = r;
  release(&c->kmem.lock);
  pop_off();
}

最终是 kalloc,在分配的时分因为当时 CPU 的 freelist 或许现已为空了,那么就需要去其它 CPU 那里偷。

void *
kalloc()
{
  push_off();
  struct run *r;
  int cid = cpuid();
  struct cpu *c = &cpus[cid];
  acquire(&c->kmem.lock);
  r = c->kmem.freelist;
  if(r) {
    c->kmem.freelist = r->next;
    release(&c->kmem.lock);
  } else {
    release(&c->kmem.lock);
    for (int i = 0; i < NCPU; i++) {
      if (i == cid) continue;
      struct cpu *nc = &cpus[i];
      acquire(&nc->kmem.lock);
      r = nc->kmem.freelist;
      if (r) {
        acquire(&c->kmem.lock);
        nc->kmem.freelist = r->next;
        release(&c->kmem.lock);
        release(&nc->kmem.lock);
        break;
      } else {
        release(&nc->kmem.lock);
        continue;
      }
    }
  }
  if(r)
    memset((char*)r, 5, PGSIZE); // fill with junk
  pop_off();
  return (void*)r;
}

去其它 CPU 那里偷的时分,因为要获取其它 CPU 的锁,所以会发生竞赛。还有一种状况便是当时进程持有当时 CPU 的锁去偷其它 CPU 的时分,那个被偷的 CPU 或许也正在测验偷当时 CPU,这样就会发生死锁。

能够选择在去其他 CPU 那里偷之前把当时 CPU 的锁开释掉,这样就不会发生死锁,破坏了死锁的必要条件。我在大佬的博客上看到说假如这个时分开释了锁,就会导致重复盗取 page。我想了一下,假如说在开释当时 CPU 的 kmem 锁之后,该进程被调度走了,另外一个进程又过来履行 kalloc,也发现当时 CPU 的 freelist 是空的,也会进行盗取。可是我感觉当时现已是关中止了,那么时钟中止也会被屏蔽,当时进程就不会被调度了,所以应该不会发生这种状况吧。这一块没有太懂,不过能经过测验,暂时先这样。

Buffer cache (hard)

这部分要做的事情跟榜首部分的意图是相同的,修正 bcache 结构体,并降低锁的粒度。将本来的用一个大锁锁住整个双向链表的规划,拆分红一个 Hash Table,每个 bucket 一个锁,那么就能够使得拜访不同 bucket 的进程并行拜访 buffer cache。

代码完成

依据 hints,最基本的思路是很简单想到的。将 buffer 以哈希表的形式安排,bucket 的数量为 13 个,每个 bucket 需要一把锁。而且把本来的双向链表规划去掉,本来的双向链表是为了完成 LRU,但现在咱们经过给每个 buffer 记载时刻戳来完成 LRU,时刻戳就为 kernel/trap.c 中的 ticks。

所以 bcache 结构改为以下形式:

struct {
  struct spinlock lock;
  struct buf buf[NBUF];
  struct buf buckets[NBUCKET];
  struct spinlock bucketlocks[NBUCKET];
} bcache;

在 buf 结构体中去掉 prev 指针,并参加 timestamp 字段,用于完成 LRU:

struct buf {
  int valid;   // has data been read from disk?
  int disk;    // does disk "own" buf?
  uint dev;
  uint blockno;
  struct sleeplock lock;
  uint refcnt;
  uint timestamp;
  struct buf *prev; // LRU cache list
  struct buf *next;
  uchar data[BSIZE];
};

这样一来就不需要保护双向链表了,binit 的完成也很简单,只需要初始化锁而且将 bcache.buf 中的所有 buffer 都放进 bcache.buckets[0],并初始化每个 buf 的 sleeplock 即可。

void
binit(void)
{
  struct buf *b;
  initlock(&bcache.lock, "bcache");
  int i;
  for (i = 0; i < NBUCKET; i++) {
    initlock(&bcache.bucketlocks[i], "bcache");
  }
  for (b = bcache.buf; b < bcache.buf+NBUF; b++) {
    initsleeplock(&b->lock, "buffer");
    b->next = bcache.buckets[0].next;
    bcache.buckets[0].next = b;
  }
}

在我最开端的完成中,bcache.lock 是用不上的(所以也没跑出正确答案)。可是依据 Hints:

Your solution might need to hold two locks in some cases; for example, during eviction you may need to hold the bcache lock and a lock per bucket. Make sure you avoid deadlock.

是需要上两把锁的,具体运用场景到后边再看。

因为咱们现在不需要保护双向链表,在 brelse 的时分直接开释 buf 的 sleep 的 lock,然后再获取对应的 bucket 的 lock,将 refcnt– 即可,假如 refcnt == 0,则将当时的 ticks 设置为 buf 的 timestamp。

void
brelse(struct buf *b)
{
  if(!holdingsleep(&b->lock))
    panic("brelse");
  releasesleep(&b->lock);
  int key = HASH_DEV_BLOCKNO(b->dev, b->blockno);
  acquire(&bcache.bucketlocks[key]);
  b->refcnt--;
  if (b->refcnt == 0) {
    b->timestamp = ticks;
  }
  release(&bcache.bucketlocks[key]);
}

相同的修正 bpin 和 bunpin,这儿就不贴出来了。

bget

整个 bget 是最摧残的当地,多线程场景下的问题仍是太难发现了。

最开端我没有意识到问题的严重性,按照最初的思路,写了榜首版代码,逻辑如下:

  1. 获取 dev 和 blockno 对应的 bucket 的锁;
  2. 假如现已有缓存了就直接回来 buf,不然履行 3;
  3. 在所有的 bucket 中找出一个 refcnt 为 0,而且最近最久未运用的 buf。在这个进程中需要对 bucket 上锁,遍历完一个 bucket 之后,开释它的锁;
  4. 假如找到了这样的 buf,就请求它地点的 bucket 锁并进行 eviction,将它从本来的 bucket 中移除,不然直接 panic;
  5. 将找到的 buf 刺进 dev 和 blockno 对应的 bucket 中,并设置 buf 的值;
  6. 开释 dev 和 blockno 对应的 bucket 的锁;
  7. 获取找到的 buf 的 sleeplock;
  8. 回来 buf;

这样乍一看是没什么问题,可是事实并非如此。

首要一个简单留意到的点便是,会发生死锁。因为咱们在发现没有缓存的时分,并没有开释刚刚获取的 bucket 的锁,然后就开端在所有 bucket 中找一个能够被 evict 的 buf,这时分要获取其它 bucket 的锁。那么假如一个进程 A 持有 bucket1 的锁,又去获取 bucket 2 的锁,可是进程 B 又持有 bucket2 的锁去获取 bucket1 的锁时,就发生了死锁。

所以在发现咱们要找的 buf 并没有在缓存中时,要先开释当时持有的 bucket 的锁,再去进行 eviction。

第二个问题便是,当咱们找到了那个能够被 evict 的 buf 时,将锁开释掉了,正式进行 evict 时,又去请求锁,可是在开释锁到从头请求锁的这个空隙,或许有其它进程又引证了刚刚找出来的那个 buf,使得它的 refcnt 不为 1,这个时分将其 evict 掉就会发生过错。

处理方案便是,在找出能够 evict 的 buf 后,不开释对应 bucket 的锁,而是直到 evict 之后再开释。那么其它进程在刚开端获取缓存的时分就会堵塞,因为它获取不到这个 bucket 的锁,待到 ecivt 完毕后,它才能去检查 bucket,这时分它就看不到那个被 evict 的 buf 了。

完成以上两个修正之后,bcachetest 现已能够经过了。可是假如履行 usertests,榜首个 manywrites 就无法经过,报的过错是 panic: freeing free block,即开释了一个本来现已开释的 block cache。而这种重复开释的原因,必定便是同一个 block 被缓存了屡次。

下面的思路来自于Miigon。

假定当时有两个进程一起拜访相同缓存块,在榜首个进程获取到对应的 bucket 锁后,发现不存在缓存,就开释了锁,进入寻觅可 evict 的 buf 阶段。这时,第二个进程相同能够获取对应 bucket 的锁,并相同发现缓存不存在,也进入寻觅可 evict 的 buf 阶段。

这个时分完全有或许发生的是,它们找到了两个不同的 buf,而且将它们 evict 掉之后都刺进了它们要找的 block 对应的 bucket 中。这时就呈现了同一块 block 有多个 cache 的状况。最终也就会触发 freeing free block。

这时分前文说到的那个 hint 就要回收了,再看一遍:

Your solution might need to hold two locks in some cases; for example, during eviction you may need to hold the bcache lock and a lock per bucket. Make sure you avoid deadlock.

再结合前一条:

It is OK to serialize eviction in bget (i.e., the part of bget that selects a buffer to re-use when a lookup misses in the cache).

现在想想其实便是暗示咱们在寻觅并 evict 可用 buf 的时分将整个流程串行化,保存最开端检查是否有缓存的并行。而且缓存丢掉的概率一般来说都是非常低的,所以后边的串行化形成的性能损失其实是能够承受的。

所以现在利用上 bcache.lock,在发现缓存不存在开释掉对应 bucket 的锁之后,马上获取 bcache.lock 这把大锁。因为在开释对应 bucket 锁到获取 bcache.lock 期间,或许有其他进程现已完成了对咱们要找的 block 设置 cache,所以在获取完 bcache.lock 后,再进行一次查找缓存,假如发现现已存在了,就直接回来对应的 buf。

这样一来,就算有多个进程一起进入 bget,也只要榜首个进程能够获取到 bcache.lock,并完成缓存的设置,后边的进程都会被获取完锁后的第二轮查找缓存给拦住。这样就避免了一个 block 被缓存屡次。

下面是完整的代码:

static struct buf*
bget(uint dev, uint blockno)
{
  struct buf *b;
  int key = HASH_DEV_BLOCKNO(dev, blockno);
  acquire(&bcache.bucketlocks[key]);
  // Is the block already cached?
  for (b = bcache.buckets[key].next; b; b = b->next) {
    if (b->dev == dev && b->blockno == blockno) {
      b->refcnt++;
      release(&bcache.bucketlocks[key]);
      acquiresleep(&b->lock);
      return b;
    }
  }
  release(&bcache.bucketlocks[key]);
  // 一定要先开释再去获取 bcache.lock
  // 不然假如另一个进程持有 bcache.lock,再在下面获取 bucketlocks[key] 就会死锁  
  acquire(&bcache.lock);
  for (b = bcache.buckets[key].next; b; b = b->next) {
    if (b->dev == dev && b->blockno == blockno) {
      acquire(&bcache.bucketlocks[key]);
      b->refcnt++;
      release(&bcache.bucketlocks[key]);
      release(&bcache.lock);
      acquiresleep(&b->lock);
      return b;
    }
  }
  // Not cached.
  int i, bidx = -1, flag = 0;
  struct buf *tmp = 0;
  for (i = 0; i < NBUCKET; i++) {
    acquire(&bcache.bucketlocks[i]);
    for (b = &bcache.buckets[i]; b->next; b = b->next) {
      if (b->next->refcnt == 0 && (!tmp || b->next->timestamp > tmp->next->timestamp)) {
          tmp = b;
          flag = 1;
      }
    }
    if (!flag) {
      release(&bcache.bucketlocks[i]); // 假如没有找到 buf 就开释锁
    } else {
      // 假如找到了新 buf,那就开释之前找到的 buf 的 bucket 锁,并保存当时 bucket 的锁
      flag = 0;
      if (bidx != -1) release(&bcache.bucketlocks[bidx]);
      bidx = i;
    }
  }
  if (!tmp) {
    panic("bget: no buffers");
  }
  b = tmp->next;
  tmp->next = b->next;
  release(&bcache.bucketlocks[bidx]); // 进行 evict 后再开释锁
  acquire(&bcache.bucketlocks[key]);
  b->next = bcache.buckets[key].next;
  bcache.buckets[key].next = b;
  b->dev = dev;
  b->blockno = blockno;
  b->valid = 0;
  b->refcnt = 1;
  release(&bcache.bucketlocks[key]);
  release(&bcache.lock);
  acquiresleep(&b->lock);
  return b;
}

测验结果

MIT 6.s081 Lab8: locks

总结

多线程场景下的问题真的很难搞,感觉在事务场景下,用一些常见的形式还没有那么难。到系统编程的场景下,这些问题都要自己考虑,处理一个问题之后或许又发生另一个问题,我自己对于这方面的能力仍是太弱了。

引证一下Miigon大佬的总结:

锁竞赛优化一般有几个思路:

  • 只在有必要同享的时分同享(对应为将资源从 CPU 同享拆分为每个 CPU 独立)
  • 有必要同享时,尽量削减在要害区中停留的时刻(对应“大锁化小锁”,降低锁的粒度)

榜首种思路便是本次 lab 的榜首部分,只要当时 CPU 的 freelist 现已为空时,才去和其它的 CPU 同享 freelist,其它状况下都是并行履行的。

第二种思路便是本次 lab 的第二部分,bcache 是没有办法独自划分给每个 CPU 的,属于有必要同享,所以只能经过缩小临界区,缩小锁的粒度来完成。

后续这方面仍是得多多加强。