在并发编程中,常常会涉及到锁、条件变量和信号量。本文从并发开始,探究为什么需求它们,它们的概念,完结原理以及运用。
并发简介
并发是指多个工作,在同一时间段内同时产生了。和并发常常一同被提到的是并行。并行是指多个工作,在同一时间点上同时产生了。
实质上来说,并发是为了程序运转的速度更快。在实践开发中,咱们或许会遇到比较耗时的操作,例如I/O操作。在整个程序运转过程中,假如某个阶段由于一直等候I/O而堵塞整个程序,就会降低程序的运转速度。为了提升程序的运转速度,咱们可以新开一个线程进行I/O操作,操作完结后告诉主线程,然后提升程序的运转速度。
并发带来的问题
并发尽管提升了程序的运转速度,可是不恰当的运用并发或许会带来一系列问题,原因在于并发是无法确认各线程履行的先后顺序的。
假定咱们想运转一个程序,它创建两个线程,每个线程都做了一些独立的工作(本例中打印“A”或“B”),代码如下:
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
void *mythread(void *arg) {
printf("%s\n", (char *) arg);
return NULL;
}
int main() {
pthread_t p1, p2;
int rc;
printf("main:begin\n");
rc = pthread_create(&p1, NULL, mythread, "A");
assert(rc == 0);
rc = pthread_create(&p2, NULL, mythread, "B");
assert(rc == 0);
rc = pthread_join(p1, NULL);
assert(rc == 0);
rc = pthread_join(p2, NULL);
assert(rc == 0);
printf("main:end\n");
return 0;
}
Tips
pthread_create:创建线程(实践上便是确认调用该线程函数的入口点),在线程创建今后,就开始运转相关的线程函数;
pthread_join:主线程等候子线程的终止。
以上代码或许的运转顺序如下:
还有更大的问题在于当多个线程操作同享数据时,会带来不行预期的成果,看如下代码:
#include <stdio.h>
#include <pthread.h>
static volatile int counter = 0;
void *mythread(void *arg) {
printf("%s:begin\n", (char *) arg);
for (int i = 0; i < 1e5; i++) {
counter += 1;
}
printf("%s:done\n", (char *) arg);
return NULL;
}
int main() {
pthread_t p1, p2;
printf("main:begin (counter=%d)\n", counter);
pthread_create(&p1, NULL, mythread, "A");
pthread_create(&p2, NULL, mythread, "B");
pthread_join(p1, NULL);
pthread_join(p2, NULL);
printf("main: done with both (counter=%d)\n", counter);
return 0;
}
该代码理论上会输出:
main:begin (counter=0)
A:begin
A:done
B:begin
B:done
main: done with both (counter=200000)
可是实践在屡次运转过程中输出成果如下:
main:begin (counter=0)
A:begin
B:begin
A:done
B:done
main: done with both (counter=115377)
main:begin (counter=0)
A:begin
B:begin
A:done
B:done
main: done with both (counter=107169)
...
咱们预期的核算成果是200000,但实践代码在履行过程中,运转成果是不确认的。
并发带来不行控的调度
上面代码运转成果不确,原因在于并发带来了不行控的调度。
咱们知道每个线程有自己的一组用于核算的寄存器。所以,当两个线程运转在同一个处理器上时,从线程 1 切换到线程 2 必定产生上下文切换,线程切换需求线程操控块(Thread Control Block,TCB)来保存每个线程的状况。
上面代码中,句子 counter += 1 在 x86 中的汇编指令如下:
mov 0x8409c2d, %eax
add $0x1, %eax
mov %eax, 0x8409c2d
假定变量counter在地址0x8409c2d。上面三条指令的详细含义如下:
-
首要 mov 指令从内存地址 0x8409c2d 取出 counter,放入 eax 寄存器;
-
给 eax 寄存器的值加 1 (0x1);
-
将 eax 的值存回内存中相同的地址。
在上面代码的履行过程中,或许出现这样的状况:
-
线程 1 进入代码区,而且将计数器 +1,它将 counter 的值(假定此刻等于 20)加载到它的寄存器 eax 中,因而线程 1 的 eax = 20,然后它向寄存器加 1,eax = 21;
-
此刻,产生时钟中止,操作体系将当前正在运转的线程(它的程序计数器、寄存器,包含 eax 等)的状况保存到线程的 TCB;
-
线程 2 开始运转,进入同一段代码。它也履行了第一条指令,获取计数器的值放入其 eax 中(运转时每个线程有自己的专用寄存器。上下切换代码将寄存器虚拟化,保存并康复它们的值)。此刻 counter 的值仍然为 20。接下来线程 2 运转后边的两条指令,将 eax 中保存的值 +1,然后将 eax 中的内容保存到 counter 中,此刻 counter 的值为 21;
-
时钟中止再次产生,上下文切换,线程 1 康复运转。它在之前现已履行过 mov 和 add 指令,所以此刻它履行最终一条 mov 指令(此刻 eax = 21),将值保存到内存,counter 再次被设置成 21。
原子办法履行
从以上可以看出,由于时钟终端,导致了counter尽管被加了两次,但表现出来的成果是被加了一次。咱们希望拥有一个不行被中止的指令,然后消除不达时宜的中止,得到正确的运转成果,即希望以原子办法履行。
所谓原子办法即要么运转完结,要么没有运转,不会存在中心态。上面代码中咱们希望以原子办法履行 3 个指令:
mov 0x8409c2d, %eax
add $0x1, %eax
mov %eax, 0x8409c2d
要想做到这一点,就需求硬件供给一些有用的指令,在这些指令上构建一个通用的调集,即所谓的同步原语。经过运用同步原语,加上操作体系的一些帮助,就可以构建多线程代码,以同步和受控的办法拜访临界区,然后牢靠的产生成果。
并发术语:临界区、竞态条件、不确认性、互斥履行
临界区(critical section):拜访同享资源的一段代码,资源通常是一个变量或数据结构。
竞态条件(race condition):出现在多个履行线程大致同时进入临界区时,它们都企图更新同享的数据结构,这导致了不确认的运转成果。
不确认性(indetermination):程序由一个或多个竞态条件组成,程序的输出因运转而异,详细取决于哪些线程在何时运转。这导致成果不是确认的。
为了避免以上问题,线程应该采用某种互斥(mutual exclusion)原语。这样做可以确保只要一个线程进入临界区,然后避免出现竞态,确保程序输出确认的成果。
并发问题的处理
经过前面临并发的介绍以及并发带来的问题,咱们知道假如临界区可以像单条原子指令相同履行,那么就可以确保程序运转正确。咱们经过引进锁来确保临界区代码的正确履行。
锁
什么是锁
锁实质上是一个变量,因而咱们需求声明一个某种类型的锁变量才能运用。这个锁变量保存了锁在某一时间的状况。它要么是可用的(available,或 unlocked,或 free),表明没有线程持有锁,要么是被占用(acquired,或 locked,或 held),表明一个线程持有锁,正处于临界区。锁变量也可以保存其他信息,例如持有锁的线程,恳求锁的线程行列等,但这些信息会被躲藏起来,锁的运用者不会发现。
锁的持有者一旦调用 unlock(),锁就变成可用了。假如没有其他等候线程(即没有其他线程调用过lock()并卡在那里),锁的状况就变成可用了。假如有等候线程(卡在lock()里),其间一个会注意到锁状况的改变,获取该锁,进入临界区。
锁的运用
POSIX(Portable Operating System Interface of UNIX,可移植操作体系接口)库将锁称为互斥量(mutex),它被用来供给线程之间的互斥,即当一个线程在临界区,它可以阻挠其他线程进入直到该线程离开临界区。详细运用办法如下:
#include <pthread.h>
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&lock);
balance = balance + 1;
pthread_mutex_unlock(&lock);
在 Go 中由 sync 包供给互斥锁,详细运用办法如下:
import "sync"
var mu sync.Mutex
func sum() {
mu.Lock()
counter = counter + 1
mu.Unlock()
}
锁的完结
前面咱们理解了锁的实质以及运用,那么怎么完结一个锁呢?锁的完结需求硬件和操作体系的帮助。完结锁的硬件指令有操控中止指令、测验并设置指令、比较并交流指令、链接的加载和条件式存储指令、获取并添加指令。
操控中止
完结一个锁,首要能想到的便是操控中止,即在临界区封闭中止。代码如下:
void lock() {
DisableInterrupts();
}
void unlock() {
EnableInterrupts();
}
假定咱们运转在一个单处理器体系上,经过进入临界区之前封闭中止,就可以确保临界区的代码不会被中止,然后原子地履行。运转结束后再翻开中止。
但这样的锁完结存在诸多问题:
-
需求信赖调用****线程:这种办法要求一切调用线可以履行特权操作(开关中止),即信赖调用线程不会乱用这种机制;
-
不支撑多处理器:假如多个线程运转在不同的处理器上,每个线程都企图进入同一个临界区,则封闭中止是没有用的。线程可以运转在其他处理上然后进入临界区;
-
功率较低:和其他指令相比,现代处理器对于翻开和封闭中止的代码履行得较慢。
测验并设置(TSL)
由于封闭中止的办法存在以上问题,所以体系设计者开始让硬件支撑锁。其间最简略的是测验并设置指令(test-and-set-lock instruction),又名原子交流(atomic exchange)。测验并设置指令的逻辑如下:
int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // 从 old_ptr 中取旧值
*old_ptr = new; // 将 new 存储到 old_ptr
return old; // 回来 old
}
以上代码是原子履行的。经过测验并设置指令,咱们可以完结一个简略的自旋锁,详细代码如下:
typedef struct lock_t {int flag;
} lock_t;
void init(lock_t *lock) {
// 0 代表锁可用,1 代表锁被占用
lock->flag = 0;
}
void lock(lock_t *lock) {
while(TestAndSet(&lock->flag, 1) == 1) {
// 自旋
}
}
void unlock(lock_t *lock) {
lock->flag = 0;
}
其间测验和设置的原子履行确保了只要一个线程可以获取锁,这就完结了一个有用的互斥原语。
比较并交流(CAS)
比较并交流指令是体系供给的另一个硬件原语,详细逻辑如下:
int CompareAndSwap(int *ptr, int expected, int new) {
int actual = *ptr;
if(actual == expected) {
*ptr = new;
}
return actual;
}
// or
int CompareAndSwap(int *ptr, int expected, int new) {
if(*ptr == expected) {
*ptr = new;
return 1;
} else {
return 0;
}
}
利用CAS指令,就可以完结一个锁,类似于测验并设置,只需求将lock函数替换为:
void lock(lock_t *lock) {
while(CompareAndSwap(&lock->flag, 0, 1) == 1) {
// spin
}
}
链接的加载和条件式存储(LL/SC)
链接的加载和条件式存储 load-linked/store-conditional(LL/SC),又称 load-reserved/store-conditional (LR/SC),是一对用于并发同步拜访内存的指令。这些指令的逻辑如下:
int LoadLinked(int *ptr) {
return *ptr;
}
int StoreConditional(int *ptr, int value) {
if(no one has updated *ptr since the LoadLinked to this address) {
*ptr = value;
return 1;
} else {
return 0;
}
}
链接的加载指令和一般的加载指令类似,都是从内存中取出值然后存入一个寄存器。要害的区别在于条件存储指令(store-conditional),只要上一次加载的地址在期间没有更新时,条件存储指令才会在该内存位置保存新值。
利用链接的加载和条件式存储指令完结一个自旋锁的代码如下:
void lock(lock_t *lock) {
while (1) {
while (LoadLinked(&lock->flag) == 1) {
// spin until it's zero
}
if (StoreConditional(&lock->flag, 1) == 1) {
return;
}
}
}
以上代码中,条件式存储产生失败的状况如下:
-
线程 A 调用lock(),履行链接的加载指令,回来 0;
-
在履行条件式存储之前,产生了中止;
-
线程 B 进入 lock 代码,也履行了链式的加载指令,也回来了 0;
-
线程 A 和 B 都履行了链式的加载指令,即将履行条件存储。但只要一个线程可以更新标志为1,然后成功获取锁;第二个履行条件存储的线程会失败,需求从头尝试获取锁。
获取并添加(FAA)
获取并添加(fetch-and-add)指令是最终一个硬件原语,它能原子地回来特定地址的旧值,并让该值自增一。该指令的逻辑如下:
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
利用 FAA 完结lock 和 unlock:
typedef struct lock_t {int ticket;
int turn;
} lock_t;
void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}
void lock(lock_t *lock) {
int turn_data = FetchAndAdd(&lock->ticket);
while (lock->turn != turn_data) {
// spin
}
}
void unlock(lock_t *lock) {
FetchAndAdd(&lock->turn);
}
FAA 运用ticket和turn两个变量来构建锁,假如线程希望获取锁,首要对一个ticket值履行一个原子的获取并添加指令。得出的值(turn_data)作为该线程的 turn 值,当某个线程的 turn_data == turn 时,该线程进入临界区。
unlock用于添加turn值,然后使下一个等候线程进入临界区。
Go 中锁的完结
根据以上,咱们看下 Go 中锁是怎么完结的。Go 锁的完结在go/mutex.go at master golang/go (github.com)中,lock和unlock的逻辑如下:
type Mutex struct {
state int32
sema uint32
}
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
// ...return
}
m.lockSlow()
}
func (m *Mutex) Unlock() {
// ...new := atomic.AddInt32(&m.state, -mutexLocked)
// ...
}
可以看到 Go 中运用比较并交流指令(atomic.CompareAndSwapInt32)完结lock和unlock函数。其间atomic.CompareAndSwapInt32的详细逻辑如下:
// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
// if(*val == old){
// *val = new;
// return 1;
// }else
// return 0;
TEXT Cas(SB), NOSPLIT, $0-13
MOVL ptr+0(FP), BX
MOVL old+4(FP), AX
MOVL new+8(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret+12(FP)
RET
关于 Go 中锁的完结就介绍这么多,更详细的源码分析可以查看文章:Go源码解析之mutex。
条件变量(Condition)
上面介绍了锁的概念以及怎么经过硬件和操作体系来完结锁,但锁并不是并发程序所需的唯一原语。许多时候线程需求在查看某一条件满意后,才持续运转。例如父线程需求查看子线程履行结束等。
咱们可以运用一个同享变量,让主线程自旋查看是否完结,假如子线程履行完结,则持续:
volatile int done = 0;
void *child(void *arg) {
printf("child\n");
done = 1;
return NULL;
}
int main(int argc, char *argv[]) {
printf("parent: begin\n");
pthread_t c;
pthread_create(&c, NULL, child, NULL); // 创建子线程
// 子线程未履行完结,则一直循环等候
while(done == 0) {
// spin
}
printf("parent: end\n");
return 0;
}
这种计划尽管可以满意一定条件后持续履行,但功率很低下,由于自旋查看会浪费 CPU 时间(占用着 CPU,却什么都不做)。咱们希望当条件没有得到满意时,父线程让出 CPU 并休眠等候,直到条件满意后履行。
什么是条件变量
咱们可以运用条件变量(condition variable)来等候一个条件变成真。条件变量是一个显式行列,当某些履行状况(即条件)不满意时,线程把自己参加行列,等候该条件。当某个线程改动上述状况时,就唤醒一个或多个等候线程,然后让它们持续履行。
条件变量的运用
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m);
pthread_cond_signal(pthread_cond_t *c);
-
pthread_cond_t:声明条件变量;
-
pthread_cond_wait:在调用时传入一个互斥量,它假定在 wait 调用时,这个互斥量现已上锁。wait 的职责是开释锁,并让调用线程休眠(原子地);
-
pthread_cond_signal:唤醒等候在某个条件变量上的睡觉线程。
详细的运用示例如下:
#include <pthread.h>
#include <printf.h>
int done = false;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
void *child(void *arg) {
printf("children\n");
thr_exit();
return NULL;
}
void thr_exit() {
pthread_mutex_lock(&m);
done = true;
pthread_cond_signal(&c);
pthread_mutex_unlock(&m);
}
void thr_join() {
pthread_mutex_lock(&m);
while (!done) {
pthread_cond_wait(&c, &m);
}
pthread_mutex_unlock(&m);
}
int main(int argc, char *argv[]) {
printf("parent: begin\n");
pthread_t p;
pthread_create(&p, NULL, child, NULL);
thr_join();
printf("parent: end\n");
return 0;
}
在 Go 中运用条件变量需求引进sync包下的Cond变量,详细运用办法如下:
package main
import (
"fmt""sync""time"
)
var done = falsefunc read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
c.Wait()
}
fmt.Println(name, "starts reading")
c.L.Unlock()
}
func write(name string, c *sync.Cond) {
fmt.Println(name, "starts writing")
time.Sleep(time.Second)
c.L.Lock()
done = true
c.L.Unlock()
fmt.Println(name, "wakes all")
// 唤醒一切协程
c.Broadcast()
}
func main() {
cond := sync.NewCond(&sync.Mutex{})
go read("reader1", cond)
go read("reader2", cond)
go read("reader3", cond)
write("writer", cond)
time.Sleep(time.Second * 3)
}
条件变量的两个坑
虚伪唤醒
第一个坑是虚伪唤醒。细心看上面的代码,可以看到对条件变量运用 while(!done){} 循环(Go 中对应 for !done {})而不是if(!done)。原因在于wait()操作在让线程休眠后会开释锁。
-
假定有两个线程 A 和 B,A 由于条件不满意而休眠并开释锁(此刻任何其他等候线程可以抢锁);
-
等候一段时间后条件满意,发出信号唤醒线程 A 进入安排妥当行列;
-
这时假定线程 B 抢先履行,改动了条件变量,线程 B 运转直到结束后;
-
线程 A 开始运转,此刻 A 从 wait 回来之前运转,它获取锁,由所以 if 判别,所以线程 A 往下履行,但此刻条件变量已不满意运转条件,往下运转会出现问题。
所以发信号给线程仅仅唤醒它们,暗示状况产生了改变,但并不会确保在它运转之前状况一直是希望的状况,即存在虚伪唤醒的状况。
为了处理虚伪唤醒问题,咱们运用while循环而不是if查看条件变量的状况,确保条件变量是满意的状况下再持续履行。
唤醒丢掉
第二个坑是唤醒丢掉。上面代码中,在判别是否调用wait前先调用****lock,这是为了避免线程由于中止导致长眠不醒(即唤醒丢掉)。
详细来说,假如父进程调用 thr_join() ,然后查看完 done 的值为 false,企图睡觉。但在调用 wait 进入睡觉之前,父进程被中止。子线程修正动量 done 为 true,发出信号,相同没有等候线程。父线程再次运转时,就会长眠不醒。
wait前先lock可以确保父线程履行完一切wait中的操作并进入等候状况后,子线程才可以调用signal。
信号量(semaphore)
什么是信号量
信号量是(semaphore)用于处理多线程同步问题(例如生产者消费者问题),它是一个同步目标,用于保持在 0 至指定最大值之间的一个计数值。当线程完结一次对该 semaphore 目标的等候(wait)时,该计数值减一;当线程完结一次对 semaphore 目标的开释(release)时,记数值加 1。当计数值为 0,则线程等候该 semaphore 目标不再能成功直至该 semaphore 目标变成 signaled 状况。semaphore 目标的计数值大于 0,为 signaled 状况;计数值等于 0,为 nosignaled 状况。
信号量的初始值决定其行为,所以首要要运用sem_init初始化信号量:
int sem_init(sem_t *sem, int pshared, unsigned int value);
-
pshared:为 0 表明信号量是在同一进程的多个线程同享,非 0 则表明信号量在不同进程间同享;
-
value:标识信号量的初始值。
信号量初始化后,咱们就可以调用sem_wait(P操作)和sem_post(V操作)与之交互:
-
sem_wait():信号量的值减 1。当信号量的值大于等于 0 时,立即回来;当信号量的值小于 0 时,让调用线程挂起,直到之后一个sem_post()操作使信号量的值大于等于 0。
-
sem_post():信号量的值加 1,假如有线程等候,唤醒其间一个。
当信号量的值为负数时,这个值便是行列中等候被调用的线程个数。尽管这个值通常不会暴露给运用者。
信号量用作锁
信号量可以用作锁(此刻被称为二值信号量),咱们可以直接把临界区用sem_wait()/sem_post()包裹。信号量用作锁的要害在于其初始值 x:
sem_t m;
sem_init(&m, 0, x);
sem_wait();
// 临界区
sem_post();
当初始值 x=1 时信号量用作锁,详细分析如下:
-
首要线程 A 调用sem_wait(),它将信号量的值减为 0。由于只要在值小于 0 时才会等候,所以线程 A 从函数回来并持续履行;
-
线程 A 持有锁(即调用sem_wait()之后,sem_post之前),此刻线程 B 尝试进入临界区,调用sem_wait()将信号量减为 -1,然后进入等候履行行列。
-
线程 A 履行完临界区代码后,调用sem_post(),将信号量添加到 0,唤醒等候线程 B,线程 B 获取锁;
-
线程 B 履行结束,调用sem_post(),信号量康复为 1。
信号量用作条件变量
信号量也可以用在一个线程暂停履行,等候某一条件成立的场景。看下面的例子,假定一个线程创建别的一线程而且等候它结束:
#include <stdio.h>
#include <semaphore.h>
#include <printf.h>
#include <pthread.h>
sem_t s;
int X = ?;
void *child(void *arg) {
printf("child\n");
sem_post(&s);
return NULL;
}
int main() {
sem_init(&s, 0, X);
printf("parent: begin\n");
pthread_t c;
pthread_create(&c, NULL, child, NULL);
sem_wait(&s); // wait for childprintf("parents: end\n");
return 0;
}
当程序运转时,希望看到如下成果:
parent: begin
child
parent: end
从上面代码可知,父线程调用sem_wait(),子线程调用sem_post(),父线程等候子线程履行完结。问题的要害在于信号量初始值X应该是多少?
信号量的初始值应该是 0。考虑以下两种状况:
-
父线程创建了子线程,但子线程并没有运转:这种状况下,父线程调用sem_wait()会先于子线程调用sem_post()。此刻父线程运转,将信号量的值减位 -1,然后休眠等候;子线程运转,调用sem_post(),信号量添加位 0,唤醒父线程,父线程从sem_wait()回来,程序履行完结。
-
子线程在父线程调用sem_wait()之前就运转结束:这种状况下,子线程会先调用sem_post(),将信号量从 0 添加到 1。当父线程运转时,调用sem_wait(),发现信号量值为 1。所以父线程将信号量从 1 减位 0,无需等候,直接从sem_wait()回来。
所以综上所述,当信号量用作锁时初始值为 1,当信号量用作条件变量时初始值为 0。
信号量的完结
sem_wait和sem_post是硬件原语完结的,所以可以运用上面介绍锁的完结中提到的硬件原语完结,这儿以比较并交流原语(CAS)为例,完结逻辑如下:
sem_wait(s) {
while(CompareAndSwap(s.flag, 0, 1) == 1) {
/* 不做任何工作 */
}
s.count--;
if(s.count < 0) {
/* 该线程进入 s.quene 行列 */
/* 堵塞该线程(还需将 s.flag 设置为 0) */
}
s.flag = 0;
}
sem_post(s) {
while(CompareAndSwap(s.flag, 0, 1) == 1) {
/* 不做任何工作 */
}
s.count++;
if(s.count <= 0) {
/* 从s.queue 行列中移出线程 p */
/* 线程 p 进入安排妥当行列 */
}
s.flag = 0;
}
Go 中的计数信号量
Go 中的信号量经过导入 sync 包的 WaitGroup 来初始化,详细运用办法如下:
package main
import (
"fmt"
"sync")
var data int
func counter() {
for i := 0; i < 1e5; i++ {
data = data + 1
}
}
func main() {
var wg sync.WaitGroup
// 信号量的初始值
wg.Add(2)
go func() {
// 履行完结,信号量值减 1
defer wg.Done()
counter()
}()
go func() {
defer wg.Done()
counter()
}()
// 等候履行完结
wg.Wait()
fmt.Println(data)
}
读者写者锁
前面讲到的锁在运用的时候,灵活性不是很好,例如咱们对一个并发数据结构有刺进和查找操作,假如直接用lock和unlock,会导致咱们在刺进和查找时都被锁限制。事实上由于刺进操作会修正数据结构,所以需求加锁,而查找操作仅仅读取该数据结构,并发履行并不会影响读取成果。读者写者锁(reader-writer lock)便是用来完结这种操作的。
读写锁可以是读加锁,写加锁以及未加锁三种状况。每次只能由一个线程处于写加锁状况可是可以有多个线程处于读加锁状况。读写锁是一把锁,它就像多路开关相同:
-
锁处于写加锁状况:堵塞企图对这个锁进行读加锁或写加锁的线程;
-
锁处于读加锁状况:堵塞企图对这个锁进行写加锁的线程,不堵塞企图对这个锁进行读加锁的线程。
运用信号量完结读者写者锁的逻辑如下:
#include <semaphore.h>
typedef struct _rwlock_t {
sem_t lock; // 二值信号量 (基础锁)
sem_t writelock; // 写锁,答应一个写,多个读
int readers; // 统计读者的数量
} rwlock_t;
void rwlock_init(rwlock_t *rw) {
rw->readers = 0;
sem_init(&rw->lock, 0, 1);
sem_init(&rw->writelock, 0, 1);
}
void rwlock_acquire_readlock(rwlock_t *rw) {
sem_wait(&rw->lock);
rw->readers++;
// 第一个读者获取 lock 后,要加 writelock,
// 堵塞读过程中其他线程的写操作
if(rw->readers == 1) {
sem_wait(&rw->writelock);
}
sem_post(&rw->lock);
}
void rwlock_release_readlock(rwlock_t *rw) {
sem_wait(&rw->lock);
rw->readers--;
// 最终一个读者要开释 writelock,
// 告诉写操作可以进行
if (rw->readers == 0) {
sem_post(&rw->writelock);
}
sem_post(&rw->lock);
}
void rwlock_acquire_writelock(rwlock_t *rw) {
sem_wait(&rw->writelock);
}
void rwlock_release_writelock(rwlock_t *rw) {
sem_post(&rw->writelock);
}
获取读锁,读者首要获取lock,然后添加reader的数量,追寻目前有多少个读者在拜访该数据结构。当第一个读者获取该锁时,读者也会获取写锁,即writelock信号量上调用sem_wait(),最终调用sem_post()开释lock。一旦一个读者获取了读锁,其他读者也会获取这个读锁。可是,想要获取写锁的线程,有必要比及一切读者都结束。最终一个退出的写者在“writelock”信号量上调用sem_post(),然后让等候的写者可以获取该锁。
总结
以上便是关于并发编程中锁、条件变量和信号量的相关介绍。再回忆一下:
-
锁实质上是一个变量,咱们经过运用lock()和unlock(),确保仅有一个线程可以拜访临界区。锁经过硬件指令完结,硬件指令有:操控中止、测验并设置(TSL)、比较并交流(CAS)、链接的加载和条件式存储(LL/SC)、获取并添加(FAA)。在 Go 中锁是经过 CAS 完结的。
-
条件变量(condition)用于等候一个条件变量变成真时再履行。经过调用wait休眠等候,调用signal告诉条件变量状况改动,唤醒休眠线程。
-
信号量( semaphore)用于处理多进(线)程同步问题,信号量的初始值决定其行为,当初始值为 1 时称为二进制信号量,功能与锁相同;当初始值为 0 时可以用作条件变量。
-
读者写者锁完结了有读有写的状况下并发读,互斥写,然后提升全体性能。
参阅
-
操作体系导论锁、条件变量、信号量相关内容
-
条件变量condition_variable的运用及圈套
-
详解linux多线程——互斥锁、条件变量、读写锁、自旋锁、信号量
-
POSIX读写锁
-
Linux 线程库