原文地址:# Go言语完成的可读性更高的并发神库
前语
哈喽,大家好,我是asong;前几天逛github发现了一个有趣的并发库-conc,其方针是:
- 更难出现goroutine泄漏
- 处理panic更友好
- 并发代码可读性高
从简介上看主要封装功能如下:
- 对
waitGroup
进行封装,防止了产生很多重复代码,而且也封装recover,安全性更高- 供给
panics.Catcher
封装recover
逻辑,统一捕获panic
,打印调用栈一些信息- 供给一个并发履行使命的
worker
池,能够操控并发度、goroutine能够进行复用,支持函数签名,一起供给了stream
办法来确保成果有序- 供给
ForEach
、map
办法高雅的处理切片接下来就区分模块来介绍一下这个库;
库房地址:github.com/sourcegraph…
WaitGroup的封装
Go言语规范库有供给sync.waitGroup
操控等待goroutine,咱们一般会写出如下代码:
func main(){
var wg sync.WaitGroup
for i:=0; i < 10; i++{
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
// recover panic
err := recover()
if err != nil {
fmt.Println(err)
}
}
// do something
handle()
}
}
wg.Wait()
}
上述代码咱们需求些一堆重复代码,而且需求单独在每一个func中处理recover逻辑,所以conc
库对其进行了封装,代码简化如下:
func main() {
wg := conc.NewWaitGroup()
for i := 0; i < 10; i++ {
wg.Go(doSomething)
}
wg.Wait()
}
func doSomething() {
fmt.Println("test")
}
conc
库封装也比较简单,结构如下:
type WaitGroup struct {
wg sync.WaitGroup
pc panics.Catcher
}
其自己完成了Catcher
类型对recover逻辑进行了封装,封装思路如下:
type Catcher struct {
recovered atomic.Pointer[RecoveredPanic]
}
recovered是原子指针类型,RecoveredPanic是捕获的recover封装,封装了堆栈等信息:
type RecoveredPanic struct {
// The original value of the panic.
Value any
// The caller list as returned by runtime.Callers when the panic was
// recovered. Can be used to produce a more detailed stack information with
// runtime.CallersFrames.
Callers []uintptr
// The formatted stacktrace from the goroutine where the panic was recovered.
// Easier to use than Callers.
Stack []byte
}
供给了Try办法履行办法,只会记载第一个panic的goroutine信息:
func (p *Catcher) Try(f func()) {
defer p.tryRecover()
f()
}
func (p *Catcher) tryRecover() {
if val := recover(); val != nil {
rp := NewRecoveredPanic(1, val)
// 只会记载第一个panic的goroutine信息
p.recovered.CompareAndSwap(nil, &rp)
}
}
供给了Repanic()
办法用来重放捕获的panic:
func (p *Catcher) Repanic() {
if val := p.Recovered(); val != nil {
panic(val)
}
}
func (p *Catcher) Recovered() *RecoveredPanic {
return p.recovered.Load()
}
waitGroup
对此也别离供给了Wait()
、WaitAndRecover()
办法:
func (h *WaitGroup) Wait() {
h.wg.Wait()
// Propagate a panic if we caught one from a child goroutine.
h.pc.Repanic()
}
func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
h.wg.Wait()
// Return a recovered panic if we caught one from a child goroutine.
return h.pc.Recovered()
}
wait
办法只需有一个goroutine产生panic就会向上抛出panic,比较简单粗犷;
waitAndRecover
办法只有有一个goroutine产生panic就会回来第一个recover的goroutine信息;
总结:conc库对waitGrouop
的封装总体是比较不错的,能够减少重复的代码;
worker池
conc
供给了几种类型的worker池:
- ContextPool:能够传递context的pool,若有goroutine产生过错能够cancel其他goroutine
- ErrorPool:经过参数能够操控只搜集第一个error还是一切error
- ResultContextPool:若有goroutine产生过错会cancel其他goroutine而且搜集过错
- RestultPool:搜集work池中每个使命的履行成果,并不能确保次序,确保次序需求运用stream或者iter.map;
咱们来看一个简单的例子:
import "github.com/sourcegraph/conc/pool"
func ExampleContextPool_WithCancelOnError() {
p := pool.New().
WithMaxGoroutines(4).
WithContext(context.Background()).
WithCancelOnError()
for i := 0; i < 3; i++ {
i := i
p.Go(func(ctx context.Context) error {
if i == 2 {
return errors.New("I will cancel all other tasks!")
}
<-ctx.Done()
return nil
})
}
err := p.Wait()
fmt.Println(err)
// Output:
// I will cancel all other tasks!
}
在创立pool时有如下办法能够调用:
-
p.WithMaxGoroutines()
配置pool中goroutine的最大数量 -
p.WithErrors
:配置pool中的task是否回来error -
p.WithContext(ctx)
:配置pool中运行的task当遇到第一个error要撤销 -
p.WithFirstError
:配置pool中的task只回来第一个error -
p.WithCollectErrored
:配置pool的task搜集一切error
pool的根底结构如下:
type Pool struct {
handle conc.WaitGroup
limiter limiter
tasks chan func()
initOnce sync.Once
}
limiter是操控器,用chan来操控goroutine的数量:
type limiter chan struct{}
func (l limiter) limit() int {
return cap(l)
}
func (l limiter) release() {
if l != nil {
<-l
}
}
pool的中心逻辑也比较简单,假如没有设置limiter,那么就看有没有闲暇的worker,不然就创立一个新的worker,然后投递使命进去;
假如设置了limiter,达到了limiter worker数量上限,就把使命投递给闲暇的worker,没有闲暇就阻塞等着;
func (p *Pool) Go(f func()) {
p.init()
if p.limiter == nil {
// 没有约束
select {
case p.tasks <- f:
// A goroutine was available to handle the task.
default:
// No goroutine was available to handle the task.
// Spawn a new one and send it the task.
p.handle.Go(p.worker)
p.tasks <- f
}
} else {
select {
case p.limiter <- struct{}{}:
// If we are below our limit, spawn a new worker rather
// than waiting for one to become available.
p.handle.Go(p.worker)
// We know there is at least one worker running, so wait
// for it to become available. This ensures we never spawn
// more workers than the number of tasks.
p.tasks <- f
case p.tasks <- f:
// A worker is available and has accepted the task.
return
}
}
}
这儿work运用的是一个无缓冲的channel,这种复用办法很奇妙,假如goroutine履行很快防止创立过多的goroutine;
运用pool处理使命不能确保有序性,conc库又供给了Stream
办法,回来成果能够坚持次序;
Stream
Steam的完成也是依赖于pool
,在此根底上做了封装确保成果的次序性,先看一个例子:
func ExampleStream() {
times := []int{20, 52, 16, 45, 4, 80}
stream := stream2.New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
stream.Go(func() stream2.Callback {
time.Sleep(dur)
// This will print in the order the tasks were submitted
return func() { fmt.Println(dur) }
})
}
stream.Wait()
// Output:
// 20ms
// 52ms
// 16ms
// 45ms
// 4ms
// 80ms
}
stream
的结构如下:
type Stream struct {
pool pool.Pool
callbackerHandle conc.WaitGroup
queue chan callbackCh
initOnce sync.Once
}
queue
是一个channel类型,callbackCh也是channel类型 – chan func():
type callbackCh chan func()
在提交goroutine
时按照次序生成callbackCh传递成果:
func (s *Stream) Go(f Task) {
s.init()
// Get a channel from the cache.
ch := getCh()
// Queue the channel for the callbacker.
s.queue <- ch
// Submit the task for execution.
s.pool.Go(func() {
defer func() {
// In the case of a panic from f, we don't want the callbacker to
// starve waiting for a callback from this channel, so give it an
// empty callback.
if r := recover(); r != nil {
ch <- func() {}
panic(r)
}
}()
// Run the task, sending its callback down this task's channel.
callback := f()
ch <- callback
})
}
var callbackChPool = sync.Pool{
New: func() any {
return make(callbackCh, 1)
},
}
func getCh() callbackCh {
return callbackChPool.Get().(callbackCh)
}
func putCh(ch callbackCh) {
callbackChPool.Put(ch)
}
ForEach和map
ForEach
conc库供给了ForEach办法能够高雅的并发处理切片,看一下官方的例子:
conc库运用泛型进行了封装,咱们只需求关注handle代码即可,防止冗余代码,咱们自己动手写一个例子:
func main() {
input := []int{1, 2, 3, 4}
iterator := iter.Iterator[int]{
MaxGoroutines: len(input) / 2,
}
iterator.ForEach(input, func(v *int) {
if *v%2 != 0 {
*v = -1
}
})
fmt.Println(input)
}
ForEach内部完成为Iterator结构及中心逻辑如下:
type Iterator[T any] struct {
MaxGoroutines int
}
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
if iter.MaxGoroutines == 0 {
// iter is a value receiver and is hence safe to mutate
iter.MaxGoroutines = defaultMaxGoroutines()
}
numInput := len(input)
if iter.MaxGoroutines > numInput {
// No more concurrent tasks than the number of input items.
iter.MaxGoroutines = numInput
}
var idx atomic.Int64
// 经过atomic操控仅创立一个闭包
task := func() {
i := int(idx.Add(1) - 1)
for ; i < numInput; i = int(idx.Add(1) - 1) {
f(i, &input[i])
}
}
var wg conc.WaitGroup
for i := 0; i < iter.MaxGoroutines; i++ {
wg.Go(task)
}
wg.Wait()
}
能够设置并发的goroutine数量,默许取的是GOMAXPROCS ,也能够自定义传参;
并发履行这块规划的很奇妙,仅创立了一个闭包,经过atomic操控idx,防止频繁触发GC;
map
conc库供给的map办法能够得到对切片中元素成果,官方例子:
运用map能够前进代码的可读性,而且减少了冗余代码,自己写个例子:
func main() {
input := []int{1, 2, 3, 4}
mapper := iter.Mapper[int, bool]{
MaxGoroutines: len(input) / 2,
}
results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
fmt.Println(results)
// Output:
// [false true false true]
}
map的完成也依赖于Iterator,也是调用的ForEachIdx办法,差异于ForEach是记载处理成果;
总结
花了小半天时间看了一下这个库,很多规划点值得咱们学习,总结一下我学习到的知识点:
- conc.WatiGroup对Sync.WaitGroup进行了封装,对Add、Done、Recover进行了封装,前进了可读性,防止了冗余代码
- ForEach、Map办法能够更高雅的并发处理切片,代码简练易读,在完成上Iterator中的并发处理运用atomic来操控只创立一个闭包,防止了GC性能问题
- pool是一个并发的协程行列,能够操控协程的数量,完成上也很奇妙,运用一个无缓冲的channel作为worker,假如goroutine履行速度快,防止了创立多个goroutine
- stream是一个确保次序的并发协程行列,完成上也很奇妙,运用sync.Pool在提交goroutine时操控次序,值得咱们学习;
小伙伴们有时间能够看一下这个并发库,学习其中的长处,慢慢前进~
好啦,本文到这儿就完毕了,我是asong,咱们下期见。
欢迎关注大众号:Golang梦工厂