我正在参加「启航方案」

前语

前面两篇文章 初见 Go Mutex、 Go Mutex 源码详解 ,我们学习了 Go言语 中的 Mutex,它是一把互斥锁,每次只答应一个 goroutine 进入临界区,这种能够确保临界区资源的状况正确性。可是有的情况下,并不是一切 goroutine 都会修正临界区状况,或许仅仅读取临界区的数据,假如此刻仍是需求每个 goroutine 拿到锁顺次进入的话,效率就有些低下了。例如房间里面有一幅画,有人想修正,有人仅仅想看一下,完全能够放要看的一部分人进去,等他们看完再让修正的人进去修正,这样既提高了效率,也确保了临界区资源的安全。看和修正,对应的便是读和写,本篇文章我们就一起来学习下 Go言语 中的读写锁 sync.RWMutex

阐明:本文中的示例,均是根据Go1.17 64位机器

RWMutex 总览

RWMutex 是一个读/写互斥锁,在某一时间只能由恣意数量reader 持有 或许 一个 writer 持有。也便是说,要么放行恣意数量的 reader,多个 reader 能够并行读;要么放行一个 writer,多个 writer 需求串行写

RWMutex 对外暴露的办法有五个:

  • RLock():读操作获取锁,假如锁已经被 writer 占用,会一向堵塞直到 writer 开释锁;否则直接取得锁;
  • RUnlock():读操作完毕之后开释锁;
  • Lock():写操作获取锁,假如锁已经被 reader 或许 writer 占用,会一向堵塞直到获取到锁;否则直接取得锁;
  • Unlock():写操作完毕之后开释锁;
  • RLocker():回来读操作的 Locker 目标,该目标的 Lock() 办法对应 RWMutexRLock()Unlock() 办法对应 RWMutexRUnlock() 办法。

一旦涉及到多个 reader 和 writer ,就需求考虑优先级问题,是 reader 优先仍是 writer 优先:

  • reader优先:只需有 reader 要进行读操作,writer 就一向等候,直到没有 reader 到来。这种方法做到了读操作的并发,可是假如 reader 继续到来,会导致 writer 饥饿,一向不能进行写操作;

  • writer优先:只需有 writer 要进行写操作,reader 就一向等候,直到没有 writer 到来。这种方法提高了写操作的优先级,可是假如 writer 继续到来,会导致 reader 饥饿,一向不能进行读操作;

  • 没有优先级:依照先来先到的次序,没有谁比谁更优先,这种相对来说会更公正。

我们先来看下 RWMutex 的运转机制,就能够知道它的优先级是什么了。

能够幻想 RWMutex 有两个队伍,一个是包含 一切reader 和你取得准入权writer行列A,一个是还没有取得准入权 writer 的 行列B

  1. 行列 A 最多只答应有 一个writer,假如有其他 writer,需求在 行列B 等候;
  2. 当一个 writer 到了 行列A 后,只答应它 之前的reader 履行读操作,新来的 reader 需求在 行列A 后边排队;
  3. 当时面的 reader 履行完读操作之后,writer 履行写操作;
  4. writer 履行完写操作后,让 后边的reader 履行读操作,再唤醒行列B 的一个 writer 到 行列A 后边排队。

初始时间 行列A 中 writer W1 前面有三个 reader,后边有两个 reader,行列B中有两个 writer

Go语言读写锁 RWMutex 源码详解

并发读 多个 reader 能够一起获取到读锁,进入临界区进行读操作;writer W1行列A 中等候,一起又来了两个 reader,直接在 行列A 后边排队

Go语言读写锁 RWMutex 源码详解

写操作 W1 前面一切的 reader 完结后,W1 取得锁,进入临界区操作

Go语言读写锁 RWMutex 源码详解

取得准入权 W1 完结写操作退出,先让后边排队的 reader 进行读操作,然后从 行列B 中唤醒 W2行列A 排队。W2行列B行列A 的过程中,R8 先到了 行列A,因而 R8 能够履行读操作。R9R10R11W2 之后到的,所以在后边排队;新来的 W4 直接在行列B 排队。

Go语言读写锁 RWMutex 源码详解

从上面的示例能够看出,RWMutex 能够看作是没有优先级,依照先来先到的次序去履行,只不过是 多个reader 能够 并行 去履行算了。

深化源码

数据结构

type RWMutex struct {
	w           Mutex  // 操控 writer 在 行列B 排队
	writerSem   uint32 // 写信号量,用于等候前面的 reader 完结读操作
	readerSem   uint32 // 读信号量,用于等候前面的 writer 完结写操作
	readerCount int32  // reader 的总数量,一起也指示是否有 writer 在行列A 中等候
	readerWait  int32  // 行列A 中 writer 前面 reader 的数量
}
// 答应最大的 reader 数量
const rwmutexMaxReaders = 1 << 30

上述中的几个变量,比较特殊的是 readerCount ,不仅表明当时 一切reader 的数量,一起表明是否有 writer 在行列A中等候。当 readerCount 变为 负数 时,就代表有 writer 在行列A 中等候了。

  • 当有 writer 进入 行列A 后,会将 readerCount 变为负数,即 readerCount = readerCount - rwmutexMaxReaders,一起使用 readerWait 变量记载它前面有多少个 reader;
  • 假如有新来的 reader,发现 readerCount 是负数,就会直接去后边排队;
  • writer 前面的 reader 在开释锁时,会将 readerCountreaderWait都减一,当 readerWait==0 时,表明 writer 前面的一切 reader 都履行完了,能够让 writer 履行写操作了;
  • writer 履行写操作完毕后,会将 readerCount 再变回正数,readerCount = readerCount + rwmutexMaxReaders

举例:假定当时有两个 reader,readerCount = 2;答应最大的reader 数量为 10

  • 当 writer 进入行列A 时,readerCount = readerCount – rwmutexMaxReaders = -8,readerWait = readerCount = 2
  • 假如再来 3 个reader,readerCount = readerCount + 3 = -5
  • 取得读锁的两个reader 履行完后,readerCount = readerCount – 2 = -7,readerWait = readerWait-2 =0,writer 取得锁
  • writer 履行完后,readerCount = readerCount + rwmutexMaxReaders = 3,当时有 3个 reader

RLock()

reader 履行读操作之前,需求调用 RLock() 获取锁

func (rw *RWMutex) RLock() {
  // reader 加锁,将 readerCount 加一,表明多了个 reader
  if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // 假如 readerCount<0,阐明有 writer 在自己前面等候,排队等候读信号量
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}

RUnlock()

reader 履行完读操作后,调用 RUnlock() 开释锁

func (rw *RWMutex) RUnlock() {
  // reader 开释锁,将 readerCount 减一,表明少了个 reader
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
    // 假如readerCount<0,阐明有 writer 在自己后边等候,看是否要让 writer 运转
		rw.rUnlockSlow(r)
	}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
  // 将 readerWait 减一,表明前面的 reader 少了一个
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// 假如 readerWait 变为了0,那么自己便是最终一个完结的 reader
    // 开释写信号量,让 writer 运转
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

Lock()

writer 履行写操作之前,调用 Lock() 获取锁

func (rw *RWMutex) Lock() {
   // 使用互斥锁,假如前面有 writer,那么就需求等候互斥锁,即在行列B 中排队等候;假如没有,能够直接进入 行列A 排队
   rw.w.Lock()
  // atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 将 readerCount 变成了负数
  // 再加 rwmutexMaxReaders,相当于 r = readerCount,r 便是 writer 前面的 reader 数量
   r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
   // 假如 r!= 0 ,表明自己前面有 reader,那么令 readerWait = r,要等前面的 reader 运转完
   if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
      runtime_SemacquireMutex(&rw.writerSem, false, 0)
   }
}

Lock()RUnlock() 是会并发进行的:

  1. 假如 Lock() 将 readerCount 变为负数后,假定 r=3,表明参加的那一刻前面有三个 reader,还没有赋值 readerWait CPU 就被强占了,readerWait = 0;
  2. 假定此刻三个 reader 的 RUnlock() 会进入到 rUnlockSlow() 逻辑,每个 reader 都将 readerWait 减一, readerWait 会变成负数,此刻不符合唤醒 writer 的条件;
  3. 三个 reader 运转完之后,此刻 readerWait = -3, Lock() 运转到 atomic.AddInt32(&rw.readerWait, r) = -3+3 =0,也不会休眠,直接获取到锁,由于前面的 reader 都运转完了。

这便是为什么 rUnlockSlow() 要判断 atomic.AddInt32(&rw.readerWait, -1) == 0 以及 Lock() 要判断 atomic.AddInt32(&rw.readerWait, r) != 0 的原因。

Unlock()

writer 履行写操作之后,调用 Lock() 开释锁

func (rw *RWMutex) Unlock() {
  // 将 readerCount 变为正数,表明当时没有 writer 在行列A 等候了
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
  // 将自己后边等候的 reader 唤醒,能够进行读操作了
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
  // 开释互斥锁,假如行列B有writer,相当于唤醒一个来行列A 排队
	rw.w.Unlock()
}

writer 对 readerCount 一加一减,不会改动全体状况,仅仅用正负来表明是否有 writer 在等候。当然,假如在 writer 将 readerCount变为负数后,来了很多 reader,将 readerCount 变为了正数,此刻reader 在 writer 没有开释锁的时分就获取到锁了,是有问题的。可是 rwmutexMaxReaders 非常大,能够不考虑这个问题。

常见问题

  1. 不行仿制

    和 Mutex 相同,RWMutex 也是不行仿制。不能仿制的原因和互斥锁相同。一旦读写锁被使用,它的字段就会记载它当时的一些状况。这个时分你去仿制这把锁,就会把它的状况也给仿制过来。可是,原来的锁在开释的时分,并不会修正你仿制出来的这个读写锁,这就会导致仿制出来的读写锁的状况不对,或许永久无法开释锁。

  2. 不行重入

    不行重入的原因是,取得锁之后,还没开释锁,又申请锁,这样有或许形成死锁。比如 reader A 获取到了读锁,writer B 等候 reader A 开释锁,reader 还没开释锁又申请了一把锁,可是这把锁申请不成功,他需求等候 writer B。这就形成了一个循环等候的死锁。

  3. 加锁和开释锁一定要成对出现,不能忘记开释锁,也不能解锁一个未加锁的锁。

实战一下

Go 中的 map 是不支持 并发写的,我们能够使用 读写锁 RWMutex 来实现并发安全的 map。在读多写少的情况下,使用 RWMutex 要比 Mutex 功能高。

package main
import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)
type ConcurrentMap struct {
	m     sync.RWMutex
	items map[string]interface{}
}
func (c *ConcurrentMap) Add(key string, value interface{}) {
	c.m.Lock()
	defer c.m.Unlock()
	c.items[key] = value
}
func (c *ConcurrentMap) Remove(key string) {
	c.m.Lock()
	defer c.m.Unlock()
	delete(c.items, key)
}
func (c *ConcurrentMap) Get(key string) interface{} {
	c.m.RLock()
	defer c.m.RUnlock()
	return c.items[key]
}
func NewConcurrentMap() ConcurrentMap {
	return ConcurrentMap{
		items: make(map[string]interface{}),
	}
}
func main() {
	m := NewConcurrentMap()
	var wait sync.WaitGroup
	wait.Add(10000)
	for i := 0; i < 10000; i++ {
		key := fmt.Sprintf("%d", rand.Intn(10))
		value := fmt.Sprintf("%d", rand.Intn(100))
		if i%100 == 0 {
			go func() {
				defer wait.Done()
				m.Add(key, value)
			}()
		} else {
			go func() {
				defer wait.Done()
				fmt.Println(m.Get(key))
				time.Sleep(time.Millisecond * 10)
			}()
		}
	}
	wait.Wait()
}

总结

本文以图文并茂的方法介绍了RWMutex的运转机制,对源码进行逐行剖析,学习了 RWMutex 底层是怎么实现的,一起列举了一些 RWMutex 的常见错误。假如本篇文章对有所协助,点个重视 + 转发哦 ^_^

更多

个人博客: lifelmy.github.io/

微信大众号:漫漫Coding路