Go完结后台使命调度体系
<大众号:仓库future>
一、背景
平常咱们在开发API的时分,前端传递过来的大批数据需求通过后端处理,如果后端处理的速度快,前端呼应就快,反之则很慢,影响用户体会。针对这种场景咱们一般都是后台异步处理,不需求前端等待所有的都履行完才回来。为了处理这一问题,需求咱们自己完结后台使命调度体系。
二、使命调度器完结
poll.go
package poller
import (
"context"
"fmt"
"log"
"sync"
"time"
)
type Poller struct {
routineGroup *goroutineGroup // 并发控制
workerNum int // 记录一起在运转的最大goroutine数
sync.Mutex
ready chan struct{} // 某个goroutine现已预备好了
metric *metric // 计算当时在运转中的goroutine数量
}
func NewPoller(workerNum int) *Poller {
return &Poller{
routineGroup: newRoutineGroup(),
workerNum: workerNum,
ready: make(chan struct{}, 1),
metric: newMetric(),
}
}
// 调度器
func (p *Poller) schedule() {
p.Lock()
defer p.Unlock()
if int(p.metric.BusyWorkers()) >= p.workerNum {
return
}
select {
case p.ready <- struct{}{}: // 只要满足当时goroutine数量小于最大goroutine数量 那么就通知poll去调度goroutine履行使命
default:
}
}
func (p *Poller) Poll(ctx context.Context) error {
for {
// step01
p.schedule() // 调度
select {
case <-p.ready: // goroutine预备好之后 这儿就会有消息
case <-ctx.Done():
return nil
}
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
default:
// step02
task, err := p.fetch(ctx) // 获取使命
if err != nil {
log.Println("fetch task error:", err.Error())
break
}
fmt.Println(task)
p.metric.IncBusyWorker() // 当时正在运转的goroutine+1
// step03
p.routineGroup.Run(func() { // 履行使命
if err := p.execute(ctx, task); err != nil {
log.Println("execute task error:", err.Error())
}
})
break LOOP
}
}
}
}
func (p *Poller) fetch(ctx context.Context) (string, error) {
time.Sleep(1000 * time.Millisecond)
return "task", nil
}
func (p *Poller) execute(ctx context.Context, task string) error {
defer func() {
p.metric.DecBusyWorker() // 履行完结之后 goroutine数量-1
p.schedule() // 从头调度下一个goroutine去履行使命 这一步是有必要的
}()
return nil
}
metric.go
package poller
import "sync/atomic"
type metric struct {
busyWorkers uint64
}
func newMetric() *metric {
return &metric{}
}
func (m *metric) IncBusyWorker() uint64 {
return atomic.AddUint64(&m.busyWorkers, 1)
}
func (m *metric) DecBusyWorker() uint64 {
return atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}
func (m *metric) BusyWorkers() uint64 {
return atomic.LoadUint64(&m.busyWorkers)
}
goroutine_group.go
package poller
import "sync"
type goroutineGroup struct {
waitGroup sync.WaitGroup
}
func newRoutineGroup() *goroutineGroup {
return new(goroutineGroup)
}
func (g *goroutineGroup) Run(fn func()) {
g.waitGroup.Add(1)
go func() {
defer g.waitGroup.Done()
fn()
}()
}
func (g *goroutineGroup) Wait() {
g.waitGroup.Wait()
}
三、测试
package main
import (
"context"
"fmt"
"ta/poller"
"go.uber.org/goleak"
"testing"
)
func TestMain(m *testing.M) {
fmt.Println("start")
goleak.VerifyTestMain(m)
}
func TestPoller(t *testing.T) {
producer := poller.NewPoller(5)
producer.Poll(context.Background())
}
成果:
四、总结
我们用别的方法也能够完结,核心关键便是控制并发节奏,防止很多请求打到task service
,在这儿起到核心作用的便是schedule
,它控制着整个使命体系的调度。一起还封装了WaitGroup
,这在大多数开源代码中都比较常见,我们能够去尝试。另外便是test case
必定得跟上,防止goroutine
走漏。