布景
chatgpt的出现可以说对程序员这个工作造成了很大的冲击,除了可以主动生成代码外,也可以了解代码,尝试了下用chatgpt去辅佐阅读源码
以kratos的limit为例
完成
下面我会将bbr limit 中的每个办法直接仿制给chatgpt,让ai进行解析
cpuproc
cpu 运用率
// cpu = cpuᵗ⁻ * decay + cpuᵗ * (1 - decay)
func cpuproc() {
ticker := time.NewTicker(time.Millisecond * 500) // same to cpu sample rate
defer func() {
ticker.Stop()
if err := recover(); err != nil {
go cpuproc()
}
}()
// EMA algorithm: https://blog.csdn.net/m0_38106113/article/details/81542863
for range ticker.C {
stat := &cpu.Stat{}
cpu.ReadStat(stat)
stat.Usage = min(stat.Usage, 1000)
prevCPU := atomic.LoadInt64(&gCPU)
curCPU := int64(float64(prevCPU)*decay + float64(stat.Usage)*(1.0-decay))
atomic.StoreInt64(&gCPU, curCPU)
}
}
这段代码界说了一个名为cpuproc
的函数,该函数会定时地读取当时CPU的运用率并核算依据EMA算法的均匀值。详细而言,这个函数会:
-
创立一个每500毫秒触发一次的定时器
-
在函数回来前停止定时器并检查是否有panic异常,假如有则重新启动cpuproc
-
在定时器触发时,获取当时CPU的运用率并进行“平滑化”处理。详细而言,将当时CPU运用率与上一时刻的均匀运用率按照指数加权移动均匀(EMA)算法的方式进行加权均匀。这里用到了
atomic
包中的LoadInt64()
和StoreInt64()
函数来确保多个goroutine并发履行时均匀运用率的正确性。 -
min(stat.Usage, 1000)
句子的作用是确保CPU运用率不会超越100%(1000为10倍的百分比),由于stat.Usage是一个累积值,可能会大于100%。 -
这个函数会一直循环履行,直到程序被强制退出为止。
Stat
cpu 快照
// Stat contains the metrics snapshot of bbr.
type Stat struct {
CPU int64
InFlight int64
MaxInFlight int64
MinRt int64
MaxPass int64
}
这段代码界说了一个名为Stat
的结构体类型,它包括了对于bbr(依据行列的带宽调节算法)的衡量快照。详细而言,这个结构体包括了以下字段:
-
CPU
字段表明当时CPU的运用率,单位是百分比,默许最大值为100。 -
InFlight
字段表明当时正在处理的恳求数,也便是当时正在占用带宽的恳求数。 -
MaxInFlight
字段表明最大答应处理的恳求数,也便是最大答应占用带宽的恳求数。 -
MinRt
字段表明当时均匀呼应时刻的最小值,单位是纳秒。 -
MaxPass
字段表明当时答应经过的最大QPS(每秒恳求数),单位是恳求数。
BBR
限流器结构体
// BBR implements bbr-like limiter.
// It is inspired by sentinel.
// https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81
type BBR struct {
cpu cpuGetter
passStat window.RollingCounter
rtStat window.RollingCounter
inFlight int64
bucketPerSecond int64
bucketDuration time.Duration
// prevDropTime defines previous start drop since initTime
prevDropTime atomic.Value
maxPASSCache atomic.Value
minRtCache atomic.Value
opts options
}
这段代码界说了一个名为BBR
的结构体类型,表明BBR限流器。该结构体包括以下字段:
-
cpu
字段表明用于获取系统CPU利用率的函数类型。 -
passStat
字段表明用于核算窗口周期内经过的恳求数的计数器。 -
rtStat
字段表明用于核算窗口周期内的恳求呼应时刻的计数器。 -
inFlight
字段表明当时正在处理的恳求数量,它是原子类型,可以安全地被并发修正。 -
bucketPerSecond
字段表明每秒钟分配的桶的数量。 -
bucketDuration
字段表明每个桶的时刻周期,即固定时刻内的恳求限流,它是一个time.Duration
类型的值。 -
prevDropTime
字段表明上一次限流降级开端的时刻。它是atomic.Value
类型,答应线程安全地读写时刻值。 -
maxPASSCache
字段表明窗口周期内最大答应经过数的缓存值。它也是一个atomic.Value
类型。 -
minRtCache
字段表明窗口周期内最小答应呼应时刻的缓存值。它也是一个atomic.Value
类型。 -
opts
字段表明该限流器的选项。它是一个名为options
的结构体类型,包括了各种参数,如窗口巨细、桶的数量、CPU利用率阈值等。
maxPASS
窗口周期内最大答应经过数
func (l *BBR) maxPASS() int64 {
passCache := l.maxPASSCache.Load()
if passCache != nil {
ps := passCache.(*counterCache)
if l.timespan(ps.time) < 1 {
return ps.val
}
}
rawMaxPass := int64(l.passStat.Reduce(func(iterator window.Iterator) float64 {
var result = 1.0
for i := 1; iterator.Next() && i < l.opts.Bucket; i++ {
bucket := iterator.Bucket()
count := 0.0
for _, p := range bucket.Points {
count += p
}
result = math.Max(result, count)
}
return result
}))
l.maxPASSCache.Store(&counterCache{
val: rawMaxPass,
time: time.Now(),
})
return rawMaxPass
}
代码首要尝试从缓存中获取 maxPASS,假如缓存之前已经设置过而且时刻跨度小于 1,那么就直接回来缓存中的值。假如缓存不存在或间隔上次设置时刻大于等于 1,则需求重新核算 maxPASS。
为了核算 maxPASS,代码运用了 passStat 变量的 Reduce 办法,该办法承受一个 window.Iterator 对象,并经过迭代器处理一些核算逻辑,最终回来一个 float64 类型的值。在 Reduce 办法中,代码会遍历迭代器中的数据并核算每个 bucket(桶)中的元素值的和,然后取得这些和的最大值。
核算完 maxPASS 之后,代码将它存储到一个 counterCache 结构体中,并将该结构体设置为缓存的值,以备下次读取。最终,办法回来核算出的 rawMaxPass 值
timespan
// timespan returns the passed bucket count
// since lastTime, if it is one bucket duration earlier than
// the last recorded time, it will return the BucketNum.
func (l *BBR) timespan(lastTime time.Time) int {
v := int(time.Since(lastTime) / l.bucketDuration)
if v > -1 {
return v
}
return l.opts.Bucket
}
该办法的作用是核算自最终一次记载时刻(即 lastTime)到现在经过了多少个 bucket,其间 bucket 的刻度为 l.bucketDuration。
首要,办法核算了现在与 lastTime 的时刻差并除以 l.bucketDuration,这样可以得到穿越的 bucket 数量。然后将这个差值转换成 int 类型,并回来其值。
可是,假如最终一次记载的时刻早于当时时刻的 l.bucketDuration,即 lastTime 与当时时刻相差少于一个 bucket 的时刻,则直接回来 l.opts.Bucket,即默许的 bucket 数量。这个判别是为了避免 timespan 办法回来负数
minRT
最小呼应时刻
func (l *BBR) minRT() int64 {
rtCache := l.minRtCache.Load()
if rtCache != nil {
rc := rtCache.(*counterCache)
if l.timespan(rc.time) < 1 {
return rc.val
}
}
rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator window.Iterator) float64 {
var result = math.MaxFloat64
for i := 1; iterator.Next() && i < l.opts.Bucket; i++ {
bucket := iterator.Bucket()
if len(bucket.Points) == 0 {
continue
}
total := 0.0
for _, p := range bucket.Points {
total += p
}
avg := total / float64(bucket.Count)
result = math.Min(result, avg)
}
return result
})))
if rawMinRT <= 0 {
rawMinRT = 1
}
l.minRtCache.Store(&counterCache{
val: rawMinRT,
time: time.Now(),
})
return rawMinRT
}
该函数是BBR算法中核算最小RT(最小呼应时刻)的函数。它首要会从缓存中获取最小呼应时刻,假如缓存中的数据未过期,则直接回来缓存中的最小呼应时刻。否则,它会核算最小呼应时刻,经过遍历滑动窗口中的一切数据,核算每个时刻段内的均匀呼应时刻,并选取其间最小的一个。假如最小呼应时刻小于等于0,则最小呼应时刻会被设为1。最终,最小呼应时刻会被保存到缓存中,并回来最小呼应时刻。
maxInFlight
最大并发数
func (l *BBR) maxInFlight() int64 {
return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.bucketPerSecond)/1000.0) + 0.5)
}
该函数是BBR算法中核算最大并发恳求数的函数。它首要调用maxPASS()函数获取最大传输速率,调用minRT()函数获取最小呼应时刻,再乘以每秒的时刻桶数(即bucketPerSecond),再除以1000(单位转换为毫秒),最终向下取整并加上0.5(四舍五入)得到最大并发恳求数。详细核算公式为:最大并发恳求数 = maxPASS * minRT * bucketPerSecond / 1000。最大并发恳求数用于约束当时连接的并发恳求数。
shouldDrop
中心办法
func (l *BBR) shouldDrop() bool {
now := time.Duration(time.Now().UnixNano())
if l.cpu() < l.opts.CPUThreshold {
// current cpu payload below the threshold
prevDropTime, _ := l.prevDropTime.Load().(time.Duration)
if prevDropTime == 0 {
// haven't start drop,
// accept current request
return false
}
if time.Duration(now-prevDropTime) <= time.Second {
// just start drop one second ago,
// check current inflight count
inFlight := atomic.LoadInt64(&l.inFlight)
return inFlight > 1 && inFlight > l.maxInFlight()
}
l.prevDropTime.Store(time.Duration(0))
return false
}
// current cpu payload exceeds the threshold
inFlight := atomic.LoadInt64(&l.inFlight)
drop := inFlight > 1 && inFlight > l.maxInFlight()
if drop {
prevDrop, _ := l.prevDropTime.Load().(time.Duration)
if prevDrop != 0 {
// already started drop, return directly
return drop
}
// store start drop time
l.prevDropTime.Store(now)
}
return drop
}
这是一个 Golang 中的 BBR 拥塞操控算法的完成,依据当时 CPU 负载状况和 inflight count 来判别是否需求进行拥塞操控(即丢掉部分恳求)。详细来说,假如当时 CPU 负载低于阈值,且已经有过一次拥塞操控,则需求检查一下 inflight count,假如超越了最大值,则需求进行拥塞操控;假如当时 CPU 负载高于阈值,则需求检查一下 inflight count 是否超越最大值,假如超越了,则需求进行拥塞操控。拥塞操控进程的完成是将 prevDropTime 存储为 0 或当时的时刻,假如需求拥塞操控,则将 prevDropTime 设置为当时时刻。
Allow
答应经过
// Allow checks all inbound traffic.
// Once overload is detected, it raises limit.ErrLimitExceed error.
func (l *BBR) Allow() (ratelimit.DoneFunc, error) {
if l.shouldDrop() {
return nil, ratelimit.ErrLimitExceed
}
atomic.AddInt64(&l.inFlight, 1)
start := time.Now().UnixNano()
ms := float64(time.Millisecond)
return func(ratelimit.DoneInfo) {
//nolint
rt := int64(math.Ceil(float64(time.Now().UnixNano()-start)) / ms)
l.rtStat.Add(rt)
atomic.AddInt64(&l.inFlight, -1)
l.passStat.Add(1)
}, nil
}
这是一个完成拥塞操控的限流器(即 BBR 算法)的 Allow 办法。Allow 办法会判别当时是否需求进行拥塞操控(即丢掉一部分恳求),假如需求则直接回来 Error。假如不需求进行拥塞操控,则将 inflight 加 1,并记载当时时刻。一起回来一个 done 函数,表明当恳求处理完成后,需求履行的操作。done 函数内部会将 inflight 减 1,并核算本次恳求的处理时刻,并依据处理时刻更新 rtStat 和 passStat。其间,rtStat 记载了每个恳求的处理时刻的散布状况,passStat 记载了总共经过的恳求的数量。
总结
- 运用下来很方便,可以精确的了解函数的意思,进步生产力
- 假如对话前指定上下文环境,ai 会更精确,比如开端前 跟ai说 下面的对话是一段限流算法的源码解析
- 存在一些问题:比如一段时刻未输入会丢掉上下文环境、对于部分代码了解不了
ps: 咱们会被AI替代吗
源码地址
ratelimit bbr: github.com/go-kratos/a…
系统自适应限流:github.com/alibaba/Sen…