我正在参加「启航方案」
前语
前面两篇文章 初见 Go Mutex、 Go Mutex 源码详解 ,我们学习了 Go言语
中的 Mutex
,它是一把互斥锁,每次只答应一个 goroutine
进入临界区,这种能够确保临界区资源的状况正确性。可是有的情况下,并不是一切 goroutine
都会修正临界区状况,或许仅仅读取临界区的数据,假如此刻仍是需求每个 goroutine
拿到锁顺次进入的话,效率就有些低下了。例如房间里面有一幅画,有人想修正,有人仅仅想看一下,完全能够放要看的一部分人进去,等他们看完再让修正的人进去修正,这样既提高了效率,也确保了临界区资源的安全。看和修正,对应的便是读和写,本篇文章我们就一起来学习下 Go言语
中的读写锁 sync.RWMutex
。
阐明:本文中的示例,均是根据Go1.17 64位机器
RWMutex 总览
RWMutex
是一个读/写互斥锁
,在某一时间只能由恣意数量
的 reader
持有 或许 一个 writer
持有。也便是说,要么放行恣意数量的 reader,多个 reader 能够并行读
;要么放行一个 writer,多个 writer 需求串行写
。
RWMutex 对外暴露的办法有五个:
- RLock():读操作获取锁,假如锁已经被
writer
占用,会一向堵塞直到writer
开释锁;否则直接取得锁; - RUnlock():读操作完毕之后开释锁;
- Lock():写操作获取锁,假如锁已经被
reader
或许writer
占用,会一向堵塞直到获取到锁;否则直接取得锁; - Unlock():写操作完毕之后开释锁;
- RLocker():回来读操作的
Locker
目标,该目标的Lock()
办法对应RWMutex
的RLock()
,Unlock()
办法对应RWMutex
的RUnlock()
办法。
一旦涉及到多个 reader 和 writer ,就需求考虑优先级问题,是 reader 优先仍是 writer 优先:
-
reader优先:只需有 reader 要进行读操作,writer 就一向等候,直到没有 reader 到来。这种方法做到了读操作的并发,可是假如 reader 继续到来,会导致 writer 饥饿,一向不能进行写操作;
-
writer优先:只需有 writer 要进行写操作,reader 就一向等候,直到没有 writer 到来。这种方法提高了写操作的优先级,可是假如 writer 继续到来,会导致 reader 饥饿,一向不能进行读操作;
-
没有优先级:依照先来先到的次序,没有谁比谁更优先,这种相对来说会更公正。
我们先来看下 RWMutex 的运转机制,就能够知道它的优先级是什么了。
能够幻想 RWMutex 有两个队伍,一个是包含 一切reader
和你取得准入权writer
的 行列A
,一个是还没有取得准入权 writer 的 行列B
。
- 行列 A 最多只答应有
一个writer
,假如有其他 writer,需求在 行列B 等候; - 当一个 writer 到了 行列A 后,只答应它
之前的reader
履行读操作,新来的 reader 需求在 行列A 后边排队; - 当时面的 reader 履行完读操作之后,writer 履行写操作;
- writer 履行完写操作后,让
后边的reader
履行读操作,再唤醒行列B
的一个writer
到 行列A 后边排队。
初始时间 行列A 中 writer W1
前面有三个 reader,后边有两个 reader,行列B中有两个 writer
并发读 多个 reader 能够一起获取到读锁,进入临界区进行读操作;writer W1
在 行列A
中等候,一起又来了两个 reader,直接在 行列A
后边排队
写操作 W1
前面一切的 reader 完结后,W1
取得锁,进入临界区操作
取得准入权 W1
完结写操作退出,先让后边排队的 reader 进行读操作,然后从 行列B 中唤醒 W2
到 行列A
排队。W2
从 行列B
到 行列A
的过程中,R8
先到了 行列A
,因而 R8
能够履行读操作。R9
、R10
、R11
在 W2
之后到的,所以在后边排队;新来的 W4
直接在行列B 排队。
从上面的示例能够看出,RWMutex
能够看作是没有优先级,依照先来先到的次序去履行,只不过是 多个reader
能够 并行
去履行算了。
深化源码
数据结构
type RWMutex struct {
w Mutex // 操控 writer 在 行列B 排队
writerSem uint32 // 写信号量,用于等候前面的 reader 完结读操作
readerSem uint32 // 读信号量,用于等候前面的 writer 完结写操作
readerCount int32 // reader 的总数量,一起也指示是否有 writer 在行列A 中等候
readerWait int32 // 行列A 中 writer 前面 reader 的数量
}
// 答应最大的 reader 数量
const rwmutexMaxReaders = 1 << 30
上述中的几个变量,比较特殊的是 readerCount
,不仅表明当时 一切reader
的数量,一起表明是否有 writer
在行列A中等候。当 readerCount
变为 负数
时,就代表有 writer 在行列A 中等候了。
- 当有 writer 进入 行列A 后,会将
readerCount
变为负数,即readerCount = readerCount - rwmutexMaxReaders
,一起使用readerWait
变量记载它前面有多少个 reader; - 假如有新来的 reader,发现
readerCount
是负数,就会直接去后边排队; - writer 前面的 reader 在开释锁时,会将
readerCount
和readerWait
都减一,当 readerWait==0 时,表明 writer 前面的一切 reader 都履行完了,能够让 writer 履行写操作了; - writer 履行写操作完毕后,会将 readerCount 再变回正数,
readerCount = readerCount + rwmutexMaxReaders
。
举例:假定当时有两个 reader,readerCount = 2;答应最大的reader 数量为 10
- 当 writer 进入行列A 时,readerCount = readerCount – rwmutexMaxReaders = -8,readerWait = readerCount = 2
- 假如再来 3 个reader,readerCount = readerCount + 3 = -5
- 取得读锁的两个reader 履行完后,readerCount = readerCount – 2 = -7,readerWait = readerWait-2 =0,writer 取得锁
- writer 履行完后,readerCount = readerCount + rwmutexMaxReaders = 3,当时有 3个 reader
RLock()
reader 履行读操作之前,需求调用 RLock() 获取锁
func (rw *RWMutex) RLock() {
// reader 加锁,将 readerCount 加一,表明多了个 reader
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 假如 readerCount<0,阐明有 writer 在自己前面等候,排队等候读信号量
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
RUnlock()
reader 履行完读操作后,调用 RUnlock() 开释锁
func (rw *RWMutex) RUnlock() {
// reader 开释锁,将 readerCount 减一,表明少了个 reader
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// 假如readerCount<0,阐明有 writer 在自己后边等候,看是否要让 writer 运转
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
// 将 readerWait 减一,表明前面的 reader 少了一个
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 假如 readerWait 变为了0,那么自己便是最终一个完结的 reader
// 开释写信号量,让 writer 运转
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
Lock()
writer 履行写操作之前,调用 Lock() 获取锁
func (rw *RWMutex) Lock() {
// 使用互斥锁,假如前面有 writer,那么就需求等候互斥锁,即在行列B 中排队等候;假如没有,能够直接进入 行列A 排队
rw.w.Lock()
// atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 将 readerCount 变成了负数
// 再加 rwmutexMaxReaders,相当于 r = readerCount,r 便是 writer 前面的 reader 数量
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 假如 r!= 0 ,表明自己前面有 reader,那么令 readerWait = r,要等前面的 reader 运转完
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
Lock()
和 RUnlock()
是会并发进行的:
- 假如 Lock() 将 readerCount 变为负数后,假定 r=3,表明参加的那一刻前面有三个 reader,还没有赋值 readerWait CPU 就被强占了,readerWait = 0;
- 假定此刻三个 reader 的 RUnlock() 会进入到 rUnlockSlow() 逻辑,每个 reader 都将 readerWait 减一, readerWait 会变成负数,此刻不符合唤醒 writer 的条件;
- 三个 reader 运转完之后,此刻 readerWait = -3, Lock() 运转到 atomic.AddInt32(&rw.readerWait, r) = -3+3 =0,也不会休眠,直接获取到锁,由于前面的 reader 都运转完了。
这便是为什么 rUnlockSlow()
要判断 atomic.AddInt32(&rw.readerWait, -1) == 0
以及 Lock()
要判断 atomic.AddInt32(&rw.readerWait, r) != 0
的原因。
Unlock()
writer 履行写操作之后,调用 Lock() 开释锁
func (rw *RWMutex) Unlock() {
// 将 readerCount 变为正数,表明当时没有 writer 在行列A 等候了
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// 将自己后边等候的 reader 唤醒,能够进行读操作了
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 开释互斥锁,假如行列B有writer,相当于唤醒一个来行列A 排队
rw.w.Unlock()
}
writer 对 readerCount 一加一减,不会改动全体状况,仅仅用正负来表明是否有 writer 在等候。当然,假如在 writer 将 readerCount变为负数后,来了很多 reader,将 readerCount 变为了正数,此刻reader 在 writer 没有开释锁的时分就获取到锁了,是有问题的。可是 rwmutexMaxReaders 非常大,能够不考虑这个问题。
常见问题
-
不行仿制
和 Mutex 相同,RWMutex 也是不行仿制。不能仿制的原因和互斥锁相同。一旦读写锁被使用,它的字段就会记载它当时的一些状况。这个时分你去仿制这把锁,就会把它的状况也给仿制过来。可是,原来的锁在开释的时分,并不会修正你仿制出来的这个读写锁,这就会导致仿制出来的读写锁的状况不对,或许永久无法开释锁。
-
不行重入
不行重入的原因是,取得锁之后,还没开释锁,又申请锁,这样有或许形成死锁。比如 reader A 获取到了读锁,writer B 等候 reader A 开释锁,reader 还没开释锁又申请了一把锁,可是这把锁申请不成功,他需求等候 writer B。这就形成了一个循环等候的死锁。
-
加锁和开释锁一定要成对出现,不能忘记开释锁,也不能解锁一个未加锁的锁。
实战一下
Go 中的 map 是不支持 并发写的,我们能够使用 读写锁 RWMutex 来实现并发安全的 map。在读多写少的情况下,使用 RWMutex 要比 Mutex 功能高。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type ConcurrentMap struct {
m sync.RWMutex
items map[string]interface{}
}
func (c *ConcurrentMap) Add(key string, value interface{}) {
c.m.Lock()
defer c.m.Unlock()
c.items[key] = value
}
func (c *ConcurrentMap) Remove(key string) {
c.m.Lock()
defer c.m.Unlock()
delete(c.items, key)
}
func (c *ConcurrentMap) Get(key string) interface{} {
c.m.RLock()
defer c.m.RUnlock()
return c.items[key]
}
func NewConcurrentMap() ConcurrentMap {
return ConcurrentMap{
items: make(map[string]interface{}),
}
}
func main() {
m := NewConcurrentMap()
var wait sync.WaitGroup
wait.Add(10000)
for i := 0; i < 10000; i++ {
key := fmt.Sprintf("%d", rand.Intn(10))
value := fmt.Sprintf("%d", rand.Intn(100))
if i%100 == 0 {
go func() {
defer wait.Done()
m.Add(key, value)
}()
} else {
go func() {
defer wait.Done()
fmt.Println(m.Get(key))
time.Sleep(time.Millisecond * 10)
}()
}
}
wait.Wait()
}
总结
本文以图文并茂的方法介绍了RWMutex的运转机制,对源码进行逐行剖析,学习了 RWMutex 底层是怎么实现的,一起列举了一些 RWMutex 的常见错误。假如本篇文章对有所协助,点个重视 + 转发哦 ^_^
更多
个人博客: lifelmy.github.io/
微信大众号:漫漫Coding路