原文地址:# Go言语完成的可读性更高的并发神库

前语

哈喽,大家好,我是asong;前几天逛github发现了一个有趣的并发库-conc,其方针是:

  • 更难出现goroutine泄漏
  • 处理panic更友好
  • 并发代码可读性高

从简介上看主要封装功能如下:

  • waitGroup进行封装,防止了产生很多重复代码,而且也封装recover,安全性更高
  • 供给panics.Catcher封装recover逻辑,统一捕获panic,打印调用栈一些信息
  • 供给一个并发履行使命的worker池,能够操控并发度、goroutine能够进行复用,支持函数签名,一起供给了stream办法来确保成果有序
  • 供给ForEachmap办法高雅的处理切片

接下来就区分模块来介绍一下这个库;

库房地址:github.com/sourcegraph…

WaitGroup的封装

Go言语规范库有供给sync.waitGroup操控等待goroutine,咱们一般会写出如下代码:

func main(){
    var wg sync.WaitGroup
    for i:=0; i < 10; i++{
        wg.Add(1)
        go func() {
            defer wg.Done()
            defer func() {
                // recover panic
                err := recover()
                if err != nil {
                    fmt.Println(err)
                }
            }
            // do something
            handle()
        }
    }
    wg.Wait()
}

上述代码咱们需求些一堆重复代码,而且需求单独在每一个func中处理recover逻辑,所以conc库对其进行了封装,代码简化如下:

func main() {
	wg := conc.NewWaitGroup()
	for i := 0; i < 10; i++ {
		wg.Go(doSomething)
	}
	wg.Wait()
}
func doSomething() {
	fmt.Println("test")
}

conc库封装也比较简单,结构如下:

type WaitGroup struct {
	wg sync.WaitGroup
	pc panics.Catcher
}

其自己完成了Catcher类型对recover逻辑进行了封装,封装思路如下:

type Catcher struct {
	recovered atomic.Pointer[RecoveredPanic]
}

recovered是原子指针类型,RecoveredPanic是捕获的recover封装,封装了堆栈等信息:

type RecoveredPanic struct {
	// The original value of the panic.
	Value any
	// The caller list as returned by runtime.Callers when the panic was
	// recovered. Can be used to produce a more detailed stack information with
	// runtime.CallersFrames.
	Callers []uintptr
	// The formatted stacktrace from the goroutine where the panic was recovered.
	// Easier to use than Callers.
	Stack []byte
}

供给了Try办法履行办法,只会记载第一个panic的goroutine信息:

func (p *Catcher) Try(f func()) {
	defer p.tryRecover()
	f()
}
func (p *Catcher) tryRecover() {
	if val := recover(); val != nil {
		rp := NewRecoveredPanic(1, val)
        // 只会记载第一个panic的goroutine信息
		p.recovered.CompareAndSwap(nil, &rp)
	}
}

供给了Repanic()办法用来重放捕获的panic:

func (p *Catcher) Repanic() {
	if val := p.Recovered(); val != nil {
		panic(val)
	}
}
func (p *Catcher) Recovered() *RecoveredPanic {
	return p.recovered.Load()
}

waitGroup对此也别离供给了Wait()WaitAndRecover()办法:

func (h *WaitGroup) Wait() {
	h.wg.Wait()
	// Propagate a panic if we caught one from a child goroutine.
	h.pc.Repanic()
}
func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
	h.wg.Wait()
	// Return a recovered panic if we caught one from a child goroutine.
	return h.pc.Recovered()
}

wait办法只需有一个goroutine产生panic就会向上抛出panic,比较简单粗犷;

waitAndRecover办法只有有一个goroutine产生panic就会回来第一个recover的goroutine信息;

总结:conc库对waitGrouop的封装总体是比较不错的,能够减少重复的代码;

worker池

conc供给了几种类型的worker池:

  • ContextPool:能够传递context的pool,若有goroutine产生过错能够cancel其他goroutine
  • ErrorPool:经过参数能够操控只搜集第一个error还是一切error
  • ResultContextPool:若有goroutine产生过错会cancel其他goroutine而且搜集过错
  • RestultPool:搜集work池中每个使命的履行成果,并不能确保次序,确保次序需求运用stream或者iter.map;

咱们来看一个简单的例子:

import "github.com/sourcegraph/conc/pool"
func ExampleContextPool_WithCancelOnError() {
	p := pool.New().
		WithMaxGoroutines(4).
		WithContext(context.Background()).
		WithCancelOnError()
	for i := 0; i < 3; i++ {
		i := i
		p.Go(func(ctx context.Context) error {
			if i == 2 {
				return errors.New("I will cancel all other tasks!")
			}
			<-ctx.Done()
			return nil
		})
	}
	err := p.Wait()
	fmt.Println(err)
	// Output:
	// I will cancel all other tasks!
}

在创立pool时有如下办法能够调用:

  • p.WithMaxGoroutines()配置pool中goroutine的最大数量
  • p.WithErrors:配置pool中的task是否回来error
  • p.WithContext(ctx):配置pool中运行的task当遇到第一个error要撤销
  • p.WithFirstError:配置pool中的task只回来第一个error
  • p.WithCollectErrored:配置pool的task搜集一切error

pool的根底结构如下:

type Pool struct {
	handle   conc.WaitGroup
	limiter  limiter
	tasks    chan func()
	initOnce sync.Once
}

limiter是操控器,用chan来操控goroutine的数量:

type limiter chan struct{}
func (l limiter) limit() int {
	return cap(l)
}
func (l limiter) release() {
	if l != nil {
		<-l
	}
}

pool的中心逻辑也比较简单,假如没有设置limiter,那么就看有没有闲暇的worker,不然就创立一个新的worker,然后投递使命进去;

假如设置了limiter,达到了limiter worker数量上限,就把使命投递给闲暇的worker,没有闲暇就阻塞等着;

func (p *Pool) Go(f func()) {
	p.init()
	if p.limiter == nil {
		// 没有约束
		select {
		case p.tasks <- f:
			// A goroutine was available to handle the task.
		default:
			// No goroutine was available to handle the task.
			// Spawn a new one and send it the task.
			p.handle.Go(p.worker)
			p.tasks <- f
		}
	} else {
		select {
		case p.limiter <- struct{}{}:
			// If we are below our limit, spawn a new worker rather
			// than waiting for one to become available.
			p.handle.Go(p.worker)
			// We know there is at least one worker running, so wait
			// for it to become available. This ensures we never spawn
			// more workers than the number of tasks.
			p.tasks <- f
		case p.tasks <- f:
			// A worker is available and has accepted the task.
			return
		}
	}
}

这儿work运用的是一个无缓冲的channel,这种复用办法很奇妙,假如goroutine履行很快防止创立过多的goroutine;

运用pool处理使命不能确保有序性,conc库又供给了Stream办法,回来成果能够坚持次序;

Stream

Steam的完成也是依赖于pool,在此根底上做了封装确保成果的次序性,先看一个例子:

func ExampleStream() {
	times := []int{20, 52, 16, 45, 4, 80}
	stream := stream2.New()
	for _, millis := range times {
		dur := time.Duration(millis) * time.Millisecond
		stream.Go(func() stream2.Callback {
			time.Sleep(dur)
			// This will print in the order the tasks were submitted
			return func() { fmt.Println(dur) }
		})
	}
	stream.Wait()
	// Output:
	// 20ms
	// 52ms
	// 16ms
	// 45ms
	// 4ms
	// 80ms
}

stream的结构如下:

type Stream struct {
	pool             pool.Pool
	callbackerHandle conc.WaitGroup
	queue            chan callbackCh
	initOnce sync.Once
}

queue是一个channel类型,callbackCh也是channel类型 – chan func():

type callbackCh chan func()

在提交goroutine时按照次序生成callbackCh传递成果:

func (s *Stream) Go(f Task) {
	s.init()
	// Get a channel from the cache.
	ch := getCh()
	// Queue the channel for the callbacker.
	s.queue <- ch
	// Submit the task for execution.
	s.pool.Go(func() {
		defer func() {
			// In the case of a panic from f, we don't want the callbacker to
			// starve waiting for a callback from this channel, so give it an
			// empty callback.
			if r := recover(); r != nil {
				ch <- func() {}
				panic(r)
			}
		}()
		// Run the task, sending its callback down this task's channel.
		callback := f()
		ch <- callback
	})
}
var callbackChPool = sync.Pool{
	New: func() any {
		return make(callbackCh, 1)
	},
}
func getCh() callbackCh {
	return callbackChPool.Get().(callbackCh)
}
func putCh(ch callbackCh) {
	callbackChPool.Put(ch)
}

ForEach和map

ForEach

conc库供给了ForEach办法能够高雅的并发处理切片,看一下官方的例子:

Go语言实现的可读性更高的并发神库

conc库运用泛型进行了封装,咱们只需求关注handle代码即可,防止冗余代码,咱们自己动手写一个例子:

func main() {
	input := []int{1, 2, 3, 4}
	iterator := iter.Iterator[int]{
		MaxGoroutines: len(input) / 2,
	}
	iterator.ForEach(input, func(v *int) {
		if *v%2 != 0 {
			*v = -1
		}
	})
	fmt.Println(input)
}

ForEach内部完成为Iterator结构及中心逻辑如下:

type Iterator[T any] struct {
	MaxGoroutines int
}
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
	if iter.MaxGoroutines == 0 {
		// iter is a value receiver and is hence safe to mutate
		iter.MaxGoroutines = defaultMaxGoroutines()
	}
	numInput := len(input)
	if iter.MaxGoroutines > numInput {
		// No more concurrent tasks than the number of input items.
		iter.MaxGoroutines = numInput
	}
	var idx atomic.Int64
	// 经过atomic操控仅创立一个闭包
	task := func() {
		i := int(idx.Add(1) - 1)
		for ; i < numInput; i = int(idx.Add(1) - 1) {
			f(i, &input[i])
		}
	}
	var wg conc.WaitGroup
	for i := 0; i < iter.MaxGoroutines; i++ {
		wg.Go(task)
	}
	wg.Wait()
}

能够设置并发的goroutine数量,默许取的是GOMAXPROCS ,也能够自定义传参;

并发履行这块规划的很奇妙,仅创立了一个闭包,经过atomic操控idx,防止频繁触发GC;

map

conc库供给的map办法能够得到对切片中元素成果,官方例子:

Go语言实现的可读性更高的并发神库

运用map能够前进代码的可读性,而且减少了冗余代码,自己写个例子:

func main() {
	input := []int{1, 2, 3, 4}
	mapper := iter.Mapper[int, bool]{
		MaxGoroutines: len(input) / 2,
	}
	results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
	fmt.Println(results)
	// Output:
	// [false true false true]
}

map的完成也依赖于Iterator,也是调用的ForEachIdx办法,差异于ForEach是记载处理成果;

总结

花了小半天时间看了一下这个库,很多规划点值得咱们学习,总结一下我学习到的知识点:

  • conc.WatiGroup对Sync.WaitGroup进行了封装,对Add、Done、Recover进行了封装,前进了可读性,防止了冗余代码
  • ForEach、Map办法能够更高雅的并发处理切片,代码简练易读,在完成上Iterator中的并发处理运用atomic来操控只创立一个闭包,防止了GC性能问题
  • pool是一个并发的协程行列,能够操控协程的数量,完成上也很奇妙,运用一个无缓冲的channel作为worker,假如goroutine履行速度快,防止了创立多个goroutine
  • stream是一个确保次序的并发协程行列,完成上也很奇妙,运用sync.Pool在提交goroutine时操控次序,值得咱们学习;

小伙伴们有时间能够看一下这个并发库,学习其中的长处,慢慢前进~

好啦,本文到这儿就完毕了,我是asong,咱们下期见。

欢迎关注大众号:Golang梦工厂