1. 简介

本文将介绍 Go 语言中的 WaitGroup 并发原语,包含 WaitGroup 的根本运用办法、完结原理、运用注意事项以及常见的运用办法。能够更好地理解和运用 WaitGroup 来和谐多个 Goroutine 的履行,进步 Go 并发编程的功率和稳定性。

2. 根本运用

2.1 界说

WaitGroup是Go语言标准库中的一个结构体,它供给了一种简单的机制,用于同步多个协程的履行。适用于需求并发履行多个使命并等候它们悉数完结后才干持续履行后续操作的场景。

2.2 运用办法

首要主协程创立WaitGroup实例,然后在每个协程的开端处,调用Add(1)办法,表明需求等候一个使命履行完结,然后协程在使命履行完结之后,调用Done办法,表明使命现已履行完结了。

主协程中,需求调用Wait()办法,等候一切协程完结使命,示例如下:

func main(){
    //首要主协程创立WaitGroup实例
    var wg sync.WaitGroup
    // 开端时调用Add办法表明有个使命开端履行
    wg.Add(1)
    go func() {
        // 开端履行...
        //完结之后,调用Done办法
        wg.Done()
    }()
    // 调用Wait()办法,等候一切协程完结使命
    wg.Wait()
    // 履行后续逻辑
}

2.3 运用比如

package main
import (
    "fmt"
    "sync"
)
func main() {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
       wg.Add(1)
       go func(i int) {
          defer wg.Done()
          fmt.Printf("使命%d开端履行\n", i)
          // 模仿协程使命履行一段时间
          time.Sleep(time.Duration(rand.Int() % 100))
          // 线程使命履行完结
          fmt.Printf("使命%d履行结束\n", i)
       }(i)
    }
    fmt.Println("主协程开端等候一切使命履行完结...")
    wg.Wait()
    fmt.Println("一切协程现已履行结束...")
}

在这个比如中,咱们运用了sync.WaitGroup来等候5个协程履行结束。在循环中,每创立一个使命,咱们调用一次wg.Add(1)办法,然后发动一个协程去履行使命,当协程完结使命后,调用wg.Done办法,奉告主协程使命现已履行结束。然后主协程会在5个协程使命悉数履行结束之后,才会持续向下履行。

3.完结原理

3.1 规划初衷

WaitGroup的规划初衷便是为了等候一组操作完结后再履行下一步操作,一般会在一组协程中运用。

3.2 根本原理

sync.WaitGroup 结构体中的 state1state2 字段是用于完结 WaitGroup 功用的重要变量。

type WaitGroup struct {
   noCopy noCopy
   state1 uint64
   state2 uint32
}

由于 WaitGroup 需求等候一组操作完结之后再履行,因而需求等候一切操作完结之后才干持续履行。为了完结这个功用,WaitGroup 运用了一个计数器 counter 来记载还有多少个操作没有完结,假如 counter 的值为 0,则表明一切操作现已完结。

一起,WaitGroup 在一切使命都完结之后,需求唤醒一切处于等候的协程,此刻需求知道有多少个协程处于等候状况。为了完结这个功用,WaitGroup 运用了一个等候计数器 waiter 来记载当前有多少个协程正在等候操作完结。

这儿WaitGroup关于计数器和等候计数器的完结,是经过一个64位无符号整数来完结的,也便是WaitGroup结构体中的state1,其中高32位保存了使命计数器counter的值,低32位保存了等候计数器waiter的值。当咱们创立一个 WaitGroup 实例时,该实例的使命计数器等候计数器都被初始化为 0。

并且,等候协程需求等候一切使命完结之后才干持续履行,所以等候协程在使命未完结时会被堵塞,当使命悉数完结后,主动被唤醒。WaitGroup运用 state2 用于完结信号量机制。经过调用 runtime_Semacquire()runtime_Semrelease() 函数,能够在不堵塞线程的情况下进行等候和告诉操作。

3.3 代码完结

3.3.1 Add办法

调用 Add() 办法增加/减小counter的值,delta的值能够是正数,也能够是负数,下面是Add办法的源码完结:

func (wg *WaitGroup) Add(delta int) {
   // delta 的值能够为负数,Done办法便是经过Add(-1)来完结的
   // statep: 为state1的地址  semap: 为state2的地址
   statep, semap := wg.state()
   // 高32位的值 加上 delta,增加使命计数器的值
   state := atomic.AddUint64(statep, uint64(delta)<<32)
   // v: 取高32位数据,获取到待完结使命数
   v := int32(state >> 32)
   // 取低32位数据,获取到等候线程的值
   w := uint32(state)
   // v > 0: 阐明还有待完结的使命数,此刻不应该唤醒等候协程
   // w = 0: 阐明没有协程在等候,此刻能够直接退出
   if v > 0 || w == 0 {
      return
   }     
   // 此刻v = 0,一切使命都完结了,唤醒等候协程
   *statep = 0
   for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
   }
}

3.3.2 Done办法完结

调用 Done() 办法表明完结了一个使命,经过调用Add办法,delta值为-1,削减使命计数器counter的值,当其归为0时,便主动唤醒一切处于等候的协程。

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
   wg.Add(-1)
}

3.3.3 Wait办法完结

调用Wait办法,等候使命履行完结,增加等候计数器Waiter的值:

func (wg *WaitGroup) Wait() {
   // statep: 为state1的地址  semap: 为state2的地址
   statep, semap := wg.state()
   for {
      // 加载state1的值
      state := atomic.LoadUint64(statep)
      // v: 取高32位数据,获取到待完结使命数
      v := int32(state >> 32)
      // 没有使命待履行,悉数都完结了
      if v == 0 {
         return
      }
      // 增加waiter计数器的值
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
          // 等候被唤醒
         runtime_Semacquire(semap)
         return
      }
   }
}

3.4 完结弥补

Add办法,Done办法以及Wait办法完结中,有一些反常场景的验证逻辑被我删去掉了。当呈现反常场景时,阐明用户运用办法和WaitGroup的规划初衷相违背了,此刻WaitGroup就会直接panic。

下面经过阐明运用的注意事项,来直接介绍WaitGroup的反常验证逻辑。

4.运用注意事项

4.1 Add办法和Done办法需求成对呈现

下面是一个Add办法和Done办法没有成对呈现的比如,此刻Add办法调多了,此刻计数器永远大于0,Wait 办法会一直堵塞等候。

package main
import (
    "fmt"
    "sync"
)
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        fmt.Println("Goroutine 1")
    }()
    go func() {
        fmt.Println("Goroutine 2")
    }()
    wg.Wait()
    fmt.Println("All goroutines finished")
}

在上述代码中,咱们调用了wg.Add(2),但只调用了一次wg.Done()。这会导致counter的值大于0,因而调用wg.Wait()会被永久堵塞,不会持续向下持续履行。

还有别的一种情况时Done办法调用多了,此刻使命计数器counter的值为负数,从WaitGroup规划的语意来看,便是需求等候完结的使命数为负数,这个不符合预期,此刻将会直接panic

package main
import (
    "fmt"
    "sync"
)
func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        fmt.Println("Goroutine 1 started")
        wg.Done() // 第一次调用Done办法
        wg.Done() // 第二次调用Done办法
        fmt.Println("Goroutine 1 completed")
    }()
    wg.Wait()
    fmt.Println("All goroutines completed")
}

在上面的比如中,咱们发动了一个goroutine,第一次调用Add办法,counter的值变为1,在第14行调用Done,此刻计数器的值变为0,此刻等候中的goroutine将会被唤醒。在第15行又调用了一次Done办法,当counter减小为0时,再次调用Done办法会导致panic,由于此刻waitGroup的计数器现已为0,再次削减将导致负数计数,这是不被允许的。

所以在调用Done办法时,需求确保每次调用都与Add办法的调用一一对应,否则会导致程序呈现过错。

4.2 在一切使命都现已增加之后,才调用Wait办法进行等候

WaitGroup的规划初衷便是为了等候一组操作完结后再履行下一步操作。所以,假如在一切使命增加之前,便调用Wait办法进行等候,此刻有可能会导致等候协程提早被唤醒,履行下一步操作,而没有增加的使命则不会被等候,这违反了WaitGroup的规划初衷,也不符合预期。下面是一个简单的比如:

package main
import (
        "fmt"
        "sync"
        "time"
)
func main() {
        var wg sync.WaitGroup
        for i := 1; i <= 3; i++ {
           go func(id int) {
              wg.Add(1)
              defer wg.Done()
              fmt.Printf("Goroutine %d started\n", id)
              time.Sleep(time.Duration(id) * time.Second) 
              fmt.Printf("Goroutine %d finished\n", id)
           }(i)
        }
        // 不等候一切使命增加,就开端等候
        wg.Wait()
        fmt.Println("All goroutines finished")
        time.Sleep(10 * time.Second)
}

代码履行成果如下,等候协程被提早唤醒,履行之后的操作,而子使命在等候协程唤醒后才开端履行:

All goroutines finished
Goroutine 1 started
Goroutine 3 started
Goroutine 2 started
Goroutine 1 finished
Goroutine 2 finished
Goroutine 3 finished

在这个比如中,咱们创立了三个协程并打印出它们开端和结束的音讯。但是,咱们没有在使命开端前调用Add办法增加使命,而是在使命开端之后再调用Add办法增加使命。

这可能会导致某些使命未被参加到WaitGroup中,等候协程就调用了wg.Wait办法,这样就会导致一些使命未被参加WaitGrou,然后导致等候协程不会等候这些使命履行完结。假如这种情况发生了,咱们会看到”All goroutines finished”被输出,但实际上有一些协程还没有完结。

因而,咱们应该在一切使命增加结束之后再调用Wait办法,以确保等候的正确性。

5. WaitGroup常见运用场景

在函数或办法中运用,假如一个大使命能够拆分为多个独立的子使命,此刻会将其进行拆分,并运用多个协程来并发履行这些使命,进步履行功率,一起运用WaitGroup等候一切子使命履行完结,完结协程间的同步。

下面来看go-redis中ClusterClient结构体中ForEachMaster办法中关于WaitGroup的运用。ForEachMaster办法一般用于在 Redis 集群中履行针对一切主节点的某种操作,例如在集群中增加或删去键,或许履行一些大局的诊断操作,具体履行的操作由传入参数fn指定。

这儿ForEachMaster办法会对一切主节点履行某种操作,这儿的完结是对一切主节点履行某种操作这个大使命,拆分为多个独立的子使命,每个子使命完结对一个Master节点履行指定操作,然后每个子使命发动一个协程去履行,主协程运用WaitGroup等候一切协程完结指定子使命,ForEachMaster也就完结了对一切主节点履行某种操作的使命。具体完结如下:

func (c *ClusterClient) ForEachMaster(
   ctx context.Context,
   fn func(ctx context.Context, client *Client) error,
) error {
   // 从头加载集群状况,以确保状况信息是最新的
   state, err := c.state.ReloadOrGet(ctx)
   if err != nil {
      return err
   }
   var wg sync.WaitGroup
   // 用于协程间通讯
   errCh := make(chan error, 1)
    // 获取到redis集群中一切的master节点
   for _, master := range state.Masters {
      // 发动一个协程来履行该使命
      wg.Add(1)
      go func(node *clusterNode) {
         // 使命完结时,调用Done奉告WaitGroup使命已完结
         defer wg.Done()
         err := fn(ctx, node.Client)
         if err != nil {
            select {
            case errCh <- err:
            default:
            }
         }
      }(master)
   }
   // 主协程等候一切使命完结
   wg.Wait()
   return nil
 }

6.总结

本文介绍了 Go 语言中的 WaitGroup 并发原语,它供给了一种简单且强壮的机制来和谐多个 Goroutine 的履行。咱们首要学习了 WaitGroup 的根本运用办法,包含怎么创立 WaitGroup、怎么向计数器中增加值、怎么等候一切 Goroutine 完结以及怎么在 Goroutine 中告诉 WaitGroup 完结。

接着,咱们了解了 WaitGroup 的完结原理,包含计数器和等候计数器的完结。了解了完结原理之后,咱们能够更好地理解 WaitGroup 的内部机制以及怎么更好地运用它来完结咱们的需求。

在接下来的部分中,咱们介绍了一些运用 WaitGroup 的注意事项,以及常见的运用办法。基于此,咱们完结了对WaitGroup的介绍。