布景

chatgpt的出现可以说对程序员这个工作造成了很大的冲击,除了可以主动生成代码外,也可以了解代码,尝试了下用chatgpt去辅佐阅读源码

以kratos的limit为例

完成

下面我会将bbr limit 中的每个办法直接仿制给chatgpt,让ai进行解析

cpuproc cpu 运用率

// cpu = cpuᵗ⁻ * decay + cpuᵗ * (1 - decay)
func cpuproc() {
   ticker := time.NewTicker(time.Millisecond * 500) // same to cpu sample rate
   defer func() {
      ticker.Stop()
      if err := recover(); err != nil {
         go cpuproc()
      }
   }()
   // EMA algorithm: https://blog.csdn.net/m0_38106113/article/details/81542863
   for range ticker.C {
      stat := &cpu.Stat{}
      cpu.ReadStat(stat)
      stat.Usage = min(stat.Usage, 1000)
      prevCPU := atomic.LoadInt64(&gCPU)
      curCPU := int64(float64(prevCPU)*decay + float64(stat.Usage)*(1.0-decay))
      atomic.StoreInt64(&gCPU, curCPU)
   }
}

这段代码界说了一个名为cpuproc的函数,该函数会定时地读取当时CPU的运用率并核算依据EMA算法的均匀值。详细而言,这个函数会:

  1. 创立一个每500毫秒触发一次的定时器

  2. 在函数回来前停止定时器并检查是否有panic异常,假如有则重新启动cpuproc

  3. 在定时器触发时,获取当时CPU的运用率并进行“平滑化”处理。详细而言,将当时CPU运用率与上一时刻的均匀运用率按照指数加权移动均匀(EMA)算法的方式进行加权均匀。这里用到了atomic包中的LoadInt64()StoreInt64()函数来确保多个goroutine并发履行时均匀运用率的正确性。

  4. min(stat.Usage, 1000)句子的作用是确保CPU运用率不会超越100%(1000为10倍的百分比),由于stat.Usage是一个累积值,可能会大于100%。

  5. 这个函数会一直循环履行,直到程序被强制退出为止。

Stat cpu 快照

// Stat contains the metrics snapshot of bbr.
type Stat struct {
   CPU         int64
   InFlight    int64
   MaxInFlight int64
   MinRt       int64
   MaxPass     int64
}

这段代码界说了一个名为Stat的结构体类型,它包括了对于bbr(依据行列的带宽调节算法)的衡量快照。详细而言,这个结构体包括了以下字段:

  1. CPU字段表明当时CPU的运用率,单位是百分比,默许最大值为100。

  2. InFlight字段表明当时正在处理的恳求数,也便是当时正在占用带宽的恳求数。

  3. MaxInFlight字段表明最大答应处理的恳求数,也便是最大答应占用带宽的恳求数。

  4. MinRt字段表明当时均匀呼应时刻的最小值,单位是纳秒。

  5. MaxPass字段表明当时答应经过的最大QPS(每秒恳求数),单位是恳求数。

BBR 限流器结构体

// BBR implements bbr-like limiter.
// It is inspired by sentinel.
// https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81
type BBR struct {
   cpu             cpuGetter
   passStat        window.RollingCounter
   rtStat          window.RollingCounter
   inFlight        int64
   bucketPerSecond int64
   bucketDuration  time.Duration
   // prevDropTime defines previous start drop since initTime
   prevDropTime atomic.Value
   maxPASSCache atomic.Value
   minRtCache   atomic.Value
   opts options
}

这段代码界说了一个名为BBR的结构体类型,表明BBR限流器。该结构体包括以下字段:

  1. cpu字段表明用于获取系统CPU利用率的函数类型。

  2. passStat字段表明用于核算窗口周期内经过的恳求数的计数器。

  3. rtStat字段表明用于核算窗口周期内的恳求呼应时刻的计数器。

  4. inFlight字段表明当时正在处理的恳求数量,它是原子类型,可以安全地被并发修正。

  5. bucketPerSecond字段表明每秒钟分配的桶的数量。

  6. bucketDuration字段表明每个桶的时刻周期,即固定时刻内的恳求限流,它是一个time.Duration类型的值。

  7. prevDropTime字段表明上一次限流降级开端的时刻。它是atomic.Value类型,答应线程安全地读写时刻值。

  8. maxPASSCache字段表明窗口周期内最大答应经过数的缓存值。它也是一个atomic.Value类型。

  9. minRtCache字段表明窗口周期内最小答应呼应时刻的缓存值。它也是一个atomic.Value类型。

  10. opts字段表明该限流器的选项。它是一个名为options的结构体类型,包括了各种参数,如窗口巨细、桶的数量、CPU利用率阈值等。

maxPASS 窗口周期内最大答应经过数

func (l *BBR) maxPASS() int64 {
   passCache := l.maxPASSCache.Load()
   if passCache != nil {
      ps := passCache.(*counterCache)
      if l.timespan(ps.time) < 1 {
         return ps.val
      }
   }
   rawMaxPass := int64(l.passStat.Reduce(func(iterator window.Iterator) float64 {
      var result = 1.0
      for i := 1; iterator.Next() && i < l.opts.Bucket; i++ {
         bucket := iterator.Bucket()
         count := 0.0
         for _, p := range bucket.Points {
            count += p
         }
         result = math.Max(result, count)
      }
      return result
   }))
   l.maxPASSCache.Store(&counterCache{
      val:  rawMaxPass,
      time: time.Now(),
   })
   return rawMaxPass
}

代码首要尝试从缓存中获取 maxPASS,假如缓存之前已经设置过而且时刻跨度小于 1,那么就直接回来缓存中的值。假如缓存不存在或间隔上次设置时刻大于等于 1,则需求重新核算 maxPASS。

为了核算 maxPASS,代码运用了 passStat 变量的 Reduce 办法,该办法承受一个 window.Iterator 对象,并经过迭代器处理一些核算逻辑,最终回来一个 float64 类型的值。在 Reduce 办法中,代码会遍历迭代器中的数据并核算每个 bucket(桶)中的元素值的和,然后取得这些和的最大值。

核算完 maxPASS 之后,代码将它存储到一个 counterCache 结构体中,并将该结构体设置为缓存的值,以备下次读取。最终,办法回来核算出的 rawMaxPass 值

timespan

// timespan returns the passed bucket count
// since lastTime, if it is one bucket duration earlier than
// the last recorded time, it will return the BucketNum.
func (l *BBR) timespan(lastTime time.Time) int {
   v := int(time.Since(lastTime) / l.bucketDuration)
   if v > -1 {
      return v
   }
   return l.opts.Bucket
}

该办法的作用是核算自最终一次记载时刻(即 lastTime)到现在经过了多少个 bucket,其间 bucket 的刻度为 l.bucketDuration。

首要,办法核算了现在与 lastTime 的时刻差并除以 l.bucketDuration,这样可以得到穿越的 bucket 数量。然后将这个差值转换成 int 类型,并回来其值。

可是,假如最终一次记载的时刻早于当时时刻的 l.bucketDuration,即 lastTime 与当时时刻相差少于一个 bucket 的时刻,则直接回来 l.opts.Bucket,即默许的 bucket 数量。这个判别是为了避免 timespan 办法回来负数

minRT 最小呼应时刻

func (l *BBR) minRT() int64 {
   rtCache := l.minRtCache.Load()
   if rtCache != nil {
      rc := rtCache.(*counterCache)
      if l.timespan(rc.time) < 1 {
         return rc.val
      }
   }
   rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator window.Iterator) float64 {
      var result = math.MaxFloat64
      for i := 1; iterator.Next() && i < l.opts.Bucket; i++ {
         bucket := iterator.Bucket()
         if len(bucket.Points) == 0 {
            continue
         }
         total := 0.0
         for _, p := range bucket.Points {
            total += p
         }
         avg := total / float64(bucket.Count)
         result = math.Min(result, avg)
      }
      return result
   })))
   if rawMinRT <= 0 {
      rawMinRT = 1
   }
   l.minRtCache.Store(&counterCache{
      val:  rawMinRT,
      time: time.Now(),
   })
   return rawMinRT
}

该函数是BBR算法中核算最小RT(最小呼应时刻)的函数。它首要会从缓存中获取最小呼应时刻,假如缓存中的数据未过期,则直接回来缓存中的最小呼应时刻。否则,它会核算最小呼应时刻,经过遍历滑动窗口中的一切数据,核算每个时刻段内的均匀呼应时刻,并选取其间最小的一个。假如最小呼应时刻小于等于0,则最小呼应时刻会被设为1。最终,最小呼应时刻会被保存到缓存中,并回来最小呼应时刻。

maxInFlight 最大并发数

func (l *BBR) maxInFlight() int64 {
   return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.bucketPerSecond)/1000.0) + 0.5)
}

该函数是BBR算法中核算最大并发恳求数的函数。它首要调用maxPASS()函数获取最大传输速率,调用minRT()函数获取最小呼应时刻,再乘以每秒的时刻桶数(即bucketPerSecond),再除以1000(单位转换为毫秒),最终向下取整并加上0.5(四舍五入)得到最大并发恳求数。详细核算公式为:最大并发恳求数 = maxPASS * minRT * bucketPerSecond / 1000。最大并发恳求数用于约束当时连接的并发恳求数。

shouldDrop 中心办法

func (l *BBR) shouldDrop() bool {
   now := time.Duration(time.Now().UnixNano())
   if l.cpu() < l.opts.CPUThreshold {
      // current cpu payload below the threshold
      prevDropTime, _ := l.prevDropTime.Load().(time.Duration)
      if prevDropTime == 0 {
         // haven't start drop,
         // accept current request
         return false
      }
      if time.Duration(now-prevDropTime) <= time.Second {
         // just start drop one second ago,
         // check current inflight count
         inFlight := atomic.LoadInt64(&l.inFlight)
         return inFlight > 1 && inFlight > l.maxInFlight()
      }
      l.prevDropTime.Store(time.Duration(0))
      return false
   }
   // current cpu payload exceeds the threshold
   inFlight := atomic.LoadInt64(&l.inFlight)
   drop := inFlight > 1 && inFlight > l.maxInFlight()
   if drop {
      prevDrop, _ := l.prevDropTime.Load().(time.Duration)
      if prevDrop != 0 {
         // already started drop, return directly
         return drop
      }
      // store start drop time
      l.prevDropTime.Store(now)
   }
   return drop
}

这是一个 Golang 中的 BBR 拥塞操控算法的完成,依据当时 CPU 负载状况和 inflight count 来判别是否需求进行拥塞操控(即丢掉部分恳求)。详细来说,假如当时 CPU 负载低于阈值,且已经有过一次拥塞操控,则需求检查一下 inflight count,假如超越了最大值,则需求进行拥塞操控;假如当时 CPU 负载高于阈值,则需求检查一下 inflight count 是否超越最大值,假如超越了,则需求进行拥塞操控。拥塞操控进程的完成是将 prevDropTime 存储为 0 或当时的时刻,假如需求拥塞操控,则将 prevDropTime 设置为当时时刻。

Allow 答应经过

// Allow checks all inbound traffic.
// Once overload is detected, it raises limit.ErrLimitExceed error.
func (l *BBR) Allow() (ratelimit.DoneFunc, error) {
   if l.shouldDrop() {
      return nil, ratelimit.ErrLimitExceed
   }
   atomic.AddInt64(&l.inFlight, 1)
   start := time.Now().UnixNano()
   ms := float64(time.Millisecond)
   return func(ratelimit.DoneInfo) {
      //nolint
      rt := int64(math.Ceil(float64(time.Now().UnixNano()-start)) / ms)
      l.rtStat.Add(rt)
      atomic.AddInt64(&l.inFlight, -1)
      l.passStat.Add(1)
   }, nil
}

这是一个完成拥塞操控的限流器(即 BBR 算法)的 Allow 办法。Allow 办法会判别当时是否需求进行拥塞操控(即丢掉一部分恳求),假如需求则直接回来 Error。假如不需求进行拥塞操控,则将 inflight 加 1,并记载当时时刻。一起回来一个 done 函数,表明当恳求处理完成后,需求履行的操作。done 函数内部会将 inflight 减 1,并核算本次恳求的处理时刻,并依据处理时刻更新 rtStat 和 passStat。其间,rtStat 记载了每个恳求的处理时刻的散布状况,passStat 记载了总共经过的恳求的数量。

总结

  • 运用下来很方便,可以精确的了解函数的意思,进步生产力
  • 假如对话前指定上下文环境,ai 会更精确,比如开端前 跟ai说 下面的对话是一段限流算法的源码解析
  • 存在一些问题:比如一段时刻未输入会丢掉上下文环境、对于部分代码了解不了

ps: 咱们会被AI替代吗

源码地址

ratelimit bbr: github.com/go-kratos/a…

系统自适应限流:github.com/alibaba/Sen…