前语

池化技术是一种资源管理技术,它经过提早创立和保护一组可重用的资源实例池,以便在需求时快速分配和收回这些资源。协程(goroutine)是 Go 言语中一种愈加轻量级的 “线程”,然而大量的 goroutine 仍是十分耗费资源的。Worker Pool 经过池化技术能够保护一定数量的 goroutine,只让这些 goroutine 去履行使命,减少了资源的糟蹋。

本文将经过介绍 Worker Pool 的一种完成思路并经过剖析一个常用的 Worker Pool 的源码 gammazero/workerpool 并在最终自己完成一个相似的 Worker Pool VIOLIN 来帮助更好的了解 Worker Pool。

如何编写一个 Worker Pool?

经过前语中对 Worker Pool 的简略介绍,咱们知道了要完成一个 Worker Pool 就只需求保护一定数量的 goroutine 并由这些 goroutine 去处理提交的使命(task / job)的即可。于是 Worker Pool 的编写思路就变得十分简略了:

  • 为 Worker Pool 设置一个详细的 goroutine 数量上限,Worker Pool 保护的 goroutine 的值不能超出这个值;
  • 提交一个使命时从 Worker Pool 中分配一个 goroutine 去履行
  • 假如有闲暇 Worker(goroutine)就交给闲暇 Worker 履行,不然检查是否到达设置的 goroutine 数量上限
  • 到达上限就将使命缓存到一个行列中(或许其他处理方式),没有到达上限就创立一个新的 Worker 去履行

详细的逻辑能够检查以下示意图,何时消费缓存行列中的使命会在源码解析部分解说。

GO:如何编写一个 Worker Pool

源码剖析

大概理清了 Worker Pool 的工作思路咱们就能够开始写代码了,这儿咱们先阅览一个十分好用的 Worker Pool 的开源完成(GitHub 1.1k Star),项目地址在这儿。

主结构体 WorkerPool

咱们先看一下这个 Worker Pool 的主结构体:

type WorkerPool struct {
	maxWorkers   int
	taskQueue    chan func()
	workerQueue  chan func()
	stoppedChan  chan struct{}
	stopSignal   chan struct{}
	waitingQueue deque.Deque[func()]
	stopLock     sync.Mutex
	stopOnce     sync.Once
	stopped      bool
	waiting      int32
	wait         bool
}

咱们重点关注以下几个属性:

  • maxWorkers:Worker Pool 所保护的最大 goroutine 数目(也便是 Worker 数);
  • taskQueue: 使命行列,提交使命时将使命提交到使命行列中;
  • workerQueue: Worker 行列,Worker 会从 Worker 行列中获取详细需求履行的使命;
  • waitingQueue: 等候行列,在没有闲暇 Worker 而且当时 Worker 数到达设置的最大值时缓存待履行使命的行列;

需求留意的点为:

  • 三个 Queue 中存储的详细元素的数据类型都为 func() ,也便是提交的使命的数据类型,理解为一个函数即可;

  • 这儿运用了 taskQueue 和 workerQueue 两个行列,提交的使命时先提交到 taskQueue 中,再从 taskQueue 中传入 workerQueue 给 Worker 去履行的,而不是直接让 Worker 消费 taskQueue 中的使命;

  • waitingQueue 的数据类型为 workerpool 作者自己完成的数据结构,作为一般的行列(FIFO)理解即可。后面咱们自己完成的时分能够替换成现有的数据结构,比方 chanlist.List 等;

New

接下来咱们来看一下 New 函数,也便是创立 WorkerPool 目标的函数。

func New(maxWorkers int) *WorkerPool {
	// There must be at least one worker.
	if maxWorkers < 1 {
		maxWorkers = 1
	}
	pool := &WorkerPool{
		maxWorkers:  maxWorkers,
		taskQueue:   make(chan func()),
		workerQueue: make(chan func()),
		stopSignal:  make(chan struct{}),
		stoppedChan: make(chan struct{}),
	}
	// Start the task dispatcher.
	go pool.dispatch()
	return pool
}

能够看到整体仍是十分清晰的,最初的判别条件确保 WorkerPool 中至少保护一个 goroutine;然后初始化 WorkerPool 目标,需求留意的是 taskQueueworkerQueue 属性都为无缓冲的 channel;然后敞开一个协程运转 WorkerPool 的中心办法 pool.dispatch ;最终回来 WorkerPool 目标。

Submit

在看中心办法 pool.dispatch 之前咱们先来看几个简略的办法进步一下信心。

func (p *WorkerPool) Submit(task func()) {
	if task != nil {
		p.taskQueue <- task
	}
}

Submit 便是咱们用来提交使命需求调用的办法,直接判别不为 nil 后传入 taskQueue 即可。顺便码一下假如往 chan 中传 nil 的状况,虽然能够正常接收 nil 可是运转的话会报空指针反常。

func main() {
	ch := make(chan func())
	go func() {
		ch <- nil
	}()
	res, ok := <-ch
	fmt.Println(res, ok) // <nil> true
	res()                // panic: runtime error: invalid memory address or nil pointer dereference
}

SubmitWait

Submit 的孪生办法,不同的是提交使命后会堵塞等候提交的使命履行完毕。

func (p *WorkerPool) SubmitWait(task func()) {
	if task == nil {
		return
	}
	doneChan := make(chan struct{})
	p.taskQueue <- func() {
		task()
		close(doneChan)
	}
	<-doneChan
}

详细完成是对提交的 task 包装一次经过再经过 channel 来完成,十分巧妙。

dispatch

WorkerPool 的中心办法,包含了前面剖析的 Worker Pool 处理的完好流程。

这是一个十分长的办法,为了便于解说和理解这儿我先列出完好的办法然后接下来会逐一代码段的解释,读者能够自己 clone 这个项目对照的阅览也能够直接看下面的代码段解析。

func (p *WorkerPool) dispatch() {
	defer close(p.stoppedChan)
	timeout := time.NewTimer(idleTimeout)
	var workerCount int
	var idle bool
	var wg sync.WaitGroup
Loop:
	for {
		// As long as tasks are in the waiting queue, incoming tasks are put
		// into the waiting queue and tasks to run are taken from the waiting
		// queue. Once the waiting queue is empty, then go back to submitting
		// incoming tasks directly to available workers.
		if p.waitingQueue.Len() != 0 {
			if !p.processWaitingQueue() {
				break Loop
			}
			continue
		}
		select {
		case task, ok := <-p.taskQueue:
			if !ok {
				break Loop
			}
			// Got a task to do.
			select {
			case p.workerQueue <- task:
			default:
				// Create a new worker, if not at max.
				if workerCount < p.maxWorkers {
					wg.Add(1)
					go worker(task, p.workerQueue, &wg)
					workerCount++
				} else {
					// Enqueue task to be executed by next available worker.
					p.waitingQueue.PushBack(task)
					atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
				}
			}
			idle = false
		case <-timeout.C:
			// Timed out waiting for work to arrive. Kill a ready worker if
			// pool has been idle for a whole timeout.
			if idle && workerCount > 0 {
				if p.killIdleWorker() {
					workerCount--
				}
			}
			idle = true
			timeout.Reset(idleTimeout)
		}
	}
	// If instructed to wait, then run tasks that are already queued.
	if p.wait {
		p.runQueuedTasks()
	}
	// Stop all remaining workers as they become ready.
	for workerCount > 0 {
		p.workerQueue <- nil
		workerCount--
	}
	wg.Wait()
	timeout.Stop()
}

其实整个办法看下来仍是十分清晰的,作者也很贴心的在首要的逻辑分支处补充了注释。

  • 咱们先来看这个办法最初的变量声明部分:

    defer close(p.stoppedChan)
    timeout := time.NewTimer(idleTimeout)
    var workerCount int
    var idle bool
    var wg sync.WaitGroup
    
    • timeout:WorkerPool 为保护的 Worker 设置了闲暇超时时刻,即假如一个闲暇的 Worker 能够存活的最大时刻,避免闲暇的 Worker 一直占用系统资源,而且这个时刻已经在内部声明死了,为 2 秒(2 * time.Second);
    • workerCount:用于记载当时运转的 Worker 数量,和装备的答应保护的最大 Worker 数量进行比对;
    • idle:超时符号位,初始为 false,假如有 Worker 超时置为 true,创立新 Worker 或将使命添加到缓存行列后置为 false
    • wg:每创立一个新的 Worker 加 1,堵塞等候直到所有的 Worker 完毕销毁。
  • 办法接下来便是一个巨大的 for 循环,其间还有 select 的嵌套,咱们先越过对 waitingQueue 的判别逻辑,直接看首要的 select 部分:

    LOOP:
    	for {
            // ... (暂时越过的部分)
            select {
            case task, ok := <-p.taskQueue:
                if !ok {
                    break Loop
                }
                // Got a task to do.
                select {
                case p.workerQueue <- task:
                default:
                    // Create a new worker, if not at max.
                    if workerCount < p.maxWorkers {
                        wg.Add(1)
                        go worker(task, p.workerQueue, &wg)
                        workerCount++
                    } else {
                        // Enqueue task to be executed by next available worker.
                        p.waitingQueue.PushBack(task)
                        atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
                    }
                }
                idle = false
            case <-timeout.C:
                // Timed out waiting for work to arrive. Kill a ready worker if
                // pool has been idle for a whole timeout.
                if idle && workerCount > 0 {
                    if p.killIdleWorker() {
                        workerCount--
                    }
                }
                idle = true
                timeout.Reset(idleTimeout)
            }
        }
    // ...
    
    • for 进来的第一个 select 的两个 case 一个为接收到使命,另一个为 Worker 超时即两秒种没有接收到新使命;

    • 先看接收到使命的 case,首先判别是否实践接收到 task,假如 okfalse,说明 taskQueue 这个 channel 已经被 close 了,直接退出循环,这儿运用 label 的原因是在 for-select 嵌套的 select 中,假如直接 break 只会退出当时的 select,继续履行 for 以内 select 以外的代码并继续下次 for 循环,不会直接退出外面的 for 循环,假如想退出外面的 for 循环,则需求借助 label 来完成。

      我的上篇文章对 for-select 中的 break,continue 和 return 的行为进行了测验和记载,能够参阅。

      然后再经过一个 select ,假如有闲暇的 Worker 就将使命分配给他即 p.workerQueue <- task,不然判别当时运转的 Worker 数是否到达上限,没有的话就敞开一个新的 goroutine 即 Worker,并把刚开始声明的 workerCountsync.WaitGroup +1,dispatch 办法也不会有并发问题所以能够直接加。Worker 数量到达上限的话就将当时使命缓存到行列中,并保存当时的等候使命数。p.waiting 是共享资源所以需求经过原子操作来确保并发安全。

    • 超时的 case 先判别 idle 为是否为 true 而且 Worker 数量大于 0,所以第一次超时不会去 kill 一个 Worker,只会把 idle 方位为 true 而且重置计时器,值得一提的是这儿的 p.killIdleWorker 办法不一定会真实 kill 一个 Worker,只是去尝试做,并将成果回来,假如成功则将 Worker 数减一。

  • 现在咱们看刚刚越过的部分,也便是 for 循环一进来:

    if p.waitingQueue.Len() != 0 {
        if !p.processWaitingQueue() {
            break Loop
        }
        continue
    }
    

    这一部分便是对缓存的使命进行消费,p.processWaitingQueue 办法会消费一个缓存的使命或许将进来的新使命进行缓存。相同进行判别假如 taskQueue 被封闭了直接 break Loop 退出循环。

  • 最终一部分是 for 循环外的收尾代码:

    // If instructed to wait, then run tasks that are already queued.
    if p.wait {
        p.runQueuedTasks()
    }
    // Stop all remaining workers as they become ready.
    for workerCount > 0 {
        p.workerQueue <- nil
        workerCount--
    }
    wg.Wait()
    timeout.Stop()
    

    这儿的 p.wait 位是在 Stop 的时分运用的符号位咱们能够先不用看,下面的便是中止所有的 Worker 并中止 timer。

worker

WorkerPool 经过 go worker(task, p.workerQueue, &wg) 敞开一个新的 Worker,能够看到办法的形参为需求履行的 task,workerQueue 和 WaitGroup。在判别 task 不为 nil 后履行 task 并堵塞接收新的 task,直到收到的 task 为 nil 完毕循环并将信号减一。十分巧妙,我自己写的话必定想不到这种写法ww

// worker executes tasks and stops when it receives a nil task.
func worker(task func(), workerQueue chan func(), wg *sync.WaitGroup) {
	for task != nil {
		task()
		task = <-workerQueue
	}
	wg.Done()
}

以上便是对 workerpool 的中心源码解析,接下来咱们能够自己完成一个 Worker Pool。

自己完成

VIOLIN 是我参阅 gammazero/workerpool 思路写的一个 Worker Pool,供给了愈加丰厚的 API 和选项,欢迎 Star :)

WorkerNum

WorkerNum 是 VIOLIN 供给的获取当时运转 Worker 数量的接口,将 Worker 的数量变为主结构体的一个属性进行保护而不是相似 workerpool 中将 workerCount 作为 dispatch 办法的一个局部变量;但相对的,需求运用原子操作来确保并发安全,所以会带来性能上的损耗。

type Violin struct {
	options *options
	mu   sync.Mutex
	once sync.Once
	workerNum      uint32
	taskNum        uint32
	waitingTaskNum uint32
	status         uint32
	workerChan   chan func()
	taskChan     chan func()
	waitingChan  chan func()
	dismissChan  chan struct{}
	pauseChan    chan struct{}
	shutdownChan chan struct{}
}

Functional Options 形式

Functional Options 是我个人十分喜欢运用的一种编程形式,VIOLIN 经过 Functional Options 形式供给了三种装备选项并有对应的默认值,详细如下表所示:

选项 默认值 描述
WithMaxWorkers 5 设置 Worker 数量上限
WithWaitingQueueSize 64 设置使命缓存行列巨细
WithWorkerIdleTimeout time.Second * 3 设置闲暇 Worker 超时时刻
  • 能够看到相比 workerpool 设置死的 2 秒 Worker idleTimeout ,VIOLIN 答应用户自己进行装备;
  • 而且 VIOLIN 将使命缓存行列的数据结构从行列换成了 Channel,所以需求用户设置 Channel 的巨细或许直接运用默认值;后续应该也会换成行列完成,不再约束缓存行列的巨细;

Status

VIOLIN 在主结构体保护 status 字段方便用户和内部获取 VIOLIN 地点的状态,一共有如下 4 个状态:

const (
	_ uint32 = iota
	statusInitialized
	statusPlaying
	statusCleaning
	statusShutdown
)
  • statusInitialized:初始化
  • statusPlaying:正常运转
  • statusCleaning:调用 ShutdownWait 后等候缓存使命行列中的使命履行完
  • statusShutdown: Shutdown 封闭

用户能够经过供给的 IsPlayingIsCleaningIsShutdown 办法来判别 VIOLIN 的状态。

遇到的问题

以下是一个我在编写 VIOLIN 时遇到的问题,其实很多都是逻辑搞的复杂了然后自己脑测不过来,导致的很多并发或许流程问题;比方 send on closed channel,data race 等

if int(v.WorkerNum()) < v.MaxWorkerNum() {
    _ = atomic.AddUint32(&v.workerNum, 1)
    go v.recruit(wg)
}

这是判别当时运转 Worker 数量小于设置的最大数量时就创立一个新的 Worker 的逻辑。初版的代码中经过原子操作将 workerNum 添加 1 的代码放在了 recruit 函数里,这就导致了有时分会出现实践运转的 Worker 数量超出最大值几倍的现象。找了半天后发现是这儿的履行顺序问题。

解决方案便是判别逻辑后马上添加值,不然由于循环的速度大于启动 goroutine 再在 recruit 函数里原子性 +1 的速度,导致一次相同的判别却创立多个 goroutine。

总结

以上便是本篇文章的所有内容了,咱们从编写 Worker Pool 的思路动身,到阅览一个 Worker Pool 的开源完成再到自己完成一个 Worker Pool,希望能够对你理解和运用 Worker Pool 有所帮助。假如哪里写错了或许有问题欢迎谈论或许私聊指出ww

参阅列表

  • github.com/gammazero/w…
  • github.com/B1NARY-GR0U…