作者:binbin0325

布景介绍

Sentinel Go-毫秒级统计数据结构揭秘

随着微服务的盛行,服务和服务之间的稳定性变得越来越重要。在 2020 年,Sentinel 社区推出 Sentinel Go 版别,朝着云原生方向演进。Sentinel Go 是一个流量治理组件,主要以流量为切入点,从流量路由、流量操控、流量整形、熔断降级、系统自适应过载保护、热门流量防护等多个维度来协助开发者保证微服务的稳定性。

无论是流量操控仍是熔断降级,完结的中心思维都是经过核算一段时刻内的目标数据(恳求数/错误数等),然后依据预选设定的阈值判别是否应该进行流量管控

那么怎么存储并核算这一段时刻内的目标数据则是中心关键,本文将揭秘 Sentienl-Go 是怎么完结的毫秒级目标数据存储与核算

固定窗口

在正式介绍之前,先简略介绍一下固定窗口的算法(也叫计数器算法)是完结流量操控比较简略的一种方法。其他常见的还有许多例如滑动时刻窗口算法,漏桶算法,令牌桶算法等等。

固定窗口算法一般是经过原子操作将恳求在核算周期内进行累加,然后当恳求数大于阈值时进行限流。

完结代码:

var (
    counter    int64 //计数
    intervalMs int64 = 1000 //窗口长度(1S)
    threshold  int64 = 2 //限流阈值
    startTime        = time.Now().UnixMilli() //窗口开端时刻
)
func main() {
    for i := 0; i < 10; i++ {
       if tryAcquire() {
          fmt.Println("成功恳求", time.Now().Unix())
      }
   }
}
func tryAcquire() bool {
    if time.Now().UnixMilli()-atomic.LoadInt64(&startTime) > intervalMs {
       atomic.StoreInt64(&startTime, time.Now().UnixMilli())
       atomic.StoreInt64(&counter, 0)
   }
    return atomic.AddInt64(&counter, 1) <= threshold
}

固定窗口的限流在完结上看起来比较简略简单,但是也有一些问题,最典型的便是“鸿沟”问题。

如下图:核算周期为 1S,限流阈值是 2 的状况下,假设 4 次恳求刚好“跨过”了固定的时刻窗口,如红色的 1SS 时刻窗口所示会有四次恳求,显着不符合限流的预期。

Sentinel Go-毫秒级统计数据结构揭秘

滑动时刻窗口

在滑动时刻窗口算法中能够处理固定窗口算法的鸿沟问题,在滑动窗口算法中一般有两个比较重要的概念

  • 核算周期:例如想约束 5S 的恳求数不能超过 100 次,那么 5S 便是核算周期
  • 窗口(格子)的巨细:一个周期内会有多个窗口(格子)进行目标(例如恳求数)的核算,长度持平的核算周期,格子的数量越多,核算的越准确

如下所示:核算周期为 1S,每个周期内分为两个格子,每个格子的长度是 500ms。

Sentinel Go-毫秒级统计数据结构揭秘

在滑动窗口中核算周期以及窗口的巨细,需求依据事务状况进行设定。

核算周期共同,窗口巨细不共同:窗口越大核算精准度越低,但并发功能好,越小:核算精准度越高,并发功能随之降低;

核算周期不共同,窗口巨细共同:周期越长抗流量脉冲状况越好。

核算结构

下面将详细介绍 Sentinel-Go 是怎么运用滑动时刻窗口高效的存储和核算目标数据的。

窗口结构

在滑动时刻窗口中时刻很重要。每个窗口(BocketWrap)的组成是由一个开端时刻和一个笼统的核算结构:

type BucketWrap struct {
   // BucketStart represents start timestamp of this statistic bucket wrapper.
   BucketStart uint64
   // Value represents the actual data structure of the metrics (e.g. MetricBucket).
   Value atomic.Value
}

开端时刻:当时格子的的起始时刻

核算结构:存储目标数据,原子操作并发安全

如下图:核算周期 1S,每个窗口的长度是 200ms。

Sentinel Go-毫秒级统计数据结构揭秘

目标数据:

  1. pass: 表明到来的数量,即此时经过 Sentinel-Go 规矩的流量数量
  2. block: 表明被拦截的流量数量
  3. complete: 表明完结的流量数量,包含正常完毕和反常完毕的状况
  4. error: 表明错误的流量数量(熔断场景运用)
  5. rt:单次恳求的 request time
  6. total:暂时无用

原子时刻轮

如上:整个核算周期内有多个时刻窗口,在 Sentinel-Go 中核算周期是由 slice 完结的,每个元素对应一个窗口。

在上面介绍了为了处理鸿沟问题,滑动时刻窗口核算的进程需求向右滑动。随时时刻的推移,无限的向右滑动,势必会让 slice 继续的扩张,导致 slice 的容量“无限”增长。

Sentinel Go-毫秒级统计数据结构揭秘

为了处理这个问题,在 Sentinel-Go 中完结了一个时刻轮的概念,经过固定 slice 长度将过期的时刻窗口重置,节省空间。

Sentinel Go-毫秒级统计数据结构揭秘

如下:原子时刻轮数据结构

type AtomicBucketWrapArray struct {
   // The base address for real data array
   base unsafe.Pointer // 窗口数组首元素地址
   // The length of slice(array), it can not be modified.
   length int // 窗口数组的长度
   data   []*BucketWrap //窗口数组
}
  • 初始化

1: 依据当时时刻核算出当时时刻对应的窗口的 startime,并得到当时窗口对应的方位

// 核算开端时刻
func calculateStartTime(now uint64, bucketLengthInMs uint32) uint64 {
   return now - (now % uint64(bucketLengthInMs))
}
// 窗口下标方位
idx := int((now / uint64(bucketLengthInMs)) % uint64(len))

Sentinel Go-毫秒级统计数据结构揭秘

2:初始化窗口数据结构(BucketWrap)

for i := idx; i <= len-1; i++ {
   ww := &BucketWrap{
      BucketStart: startTime,
      Value:       atomic.Value{},
   }
   ww.Value.Store(generator.NewEmptyBucket())
   ret.data[i] = ww
   startTime += uint64(bucketLengthInMs)
}
for i := 0; i < idx; i++ {
   ww := &BucketWrap{
      BucketStart: startTime,
      Value:       atomic.Value{},
   }
   ww.Value.Store(generator.NewEmptyBucket())
   ret.data[i] = ww
   startTime += uint64(bucketLengthInMs)
}

Sentinel Go-毫秒级统计数据结构揭秘

3:将窗口数组首元素地址设置到原子时刻轮

// calculate base address for real data array
sliHeader := (*util.SliceHeader)(unsafe.Pointer(&ret.data))
ret.base = unsafe.Pointer((**BucketWrap)(unsafe.Pointer(sliHeader.Data)))

假如对unsafe.Pointer和slice熟悉的同学,对于这段代码不难理解。这里经过unsafe.Pointer 将底层 slice 首元素(第一个窗口)地址设置到原子时刻轮中。这么做的原因主要是完结对时刻轮中的元素(窗口)进行原子无锁的读取和更新

  • 窗口获取&窗口替换

怎么在并发安全的状况下读取窗口和对窗口进行替换(时刻轮涉及到对窗口更新操作)代码如下:

// 获取对应窗口
func (aa *AtomicBucketWrapArray) get(idx int) *BucketWrap {
   // aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
   // then convert to (*unsafe.Pointer)
   if offset, ok := aa.elementOffset(idx); ok {
      return (*BucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(offset)))
   }
   return nil
}
// 替换对应窗口
func (aa *AtomicBucketWrapArray) compareAndSet(idx int, except, update *BucketWrap) bool {
   // aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
   // then convert to (*unsafe.Pointer)
   // update secondary pointer
   if offset, ok := aa.elementOffset(idx); ok {
      return atomic.CompareAndSwapPointer((*unsafe.Pointer)(offset), unsafe.Pointer(except), unsafe.Pointer(update))
   }
   return false
}
// 获取对应窗口的地址
func (aa *AtomicBucketWrapArray) elementOffset(idx int) (unsafe.Pointer, bool) {
   if idx >= aa.length || idx < 0 {
      logging.Error(errors.New("array index out of bounds"),
         "array index out of bounds in AtomicBucketWrapArray.elementOffset()",
         "idx", idx, "arrayLength", aa.length)
      return nil, false
   }
   basePtr := aa.base
   return unsafe.Pointer(uintptr(basePtr) + uintptr(idx)*unsafe.Sizeof(basePtr)), true
}

获取窗口

  1. 在 get func 中接收依据当时时刻核算出的窗口对应下标方位
  2. 依据下标方位在 elementOffset func 中,首先将底层的 slice 首元素地址转换成 uintptr,然后将窗口对应下标*对应的指针字节巨细即能够得到对应窗口元素的地址
  3. 将对应窗口地址转换成时刻窗口(*BucketWarp)即可

Sentinel Go-毫秒级统计数据结构揭秘

窗口更新

和获取窗口一样,获取到对应下标方位的窗口地址,然后利用 atomic.CompareAndSwapPointer 进行 cas 更新,将新的窗口指针地址更新到底层数组中。

滑动窗口

在原子时刻轮中提供了对窗口读取以及更新的操作。那么在什么机遇触发更新以及怎么滑动?

  • 滑动

所谓滑动便是依据当时时刻找到整个核算周期的一切窗口中的数据。例如在限流场景下,咱们需求获取核算周期内的一切 pass 的流量,从而来判别当时流量是否应该被限流。

中心代码:

// 依据当时时刻获取周期内的一切窗口
func (m *SlidingWindowMetric) getSatisfiedBuckets(now uint64) []*BucketWrap {
   start, end := m.getBucketStartRange(now)
   satisfiedBuckets := m.real.ValuesConditional(now, func(ws uint64) bool {
      return ws >= start && ws <= end
   })
   return satisfiedBuckets
}
// 依据当时时刻获取整个周期对应的窗口的开端时刻和完毕时刻
func (m *SlidingWindowMetric) getBucketStartRange(timeMs uint64) (start, end uint64) {
   curBucketStartTime := calculateStartTime(timeMs, m.real.BucketLengthInMs())
   end = curBucketStartTime
   start = end - uint64(m.intervalInMs) + uint64(m.real.BucketLengthInMs())
   return
}
// 匹配符合条件的窗口
func (la *LeapArray) ValuesConditional(now uint64, predicate base.TimePredicate) []*BucketWrap {
   if now <= 0 {
      return make([]*BucketWrap, 0)
   }
   ret := make([]*BucketWrap, 0, la.array.length)
   for i := 0; i < la.array.length; i++ {
      ww := la.array.get(i)
      if ww == nil || la.isBucketDeprecated(now, ww) || !predicate(atomic.LoadUint64(&ww.BucketStart)) {
         continue
      }
      ret = append(ret, ww)
   }
   return ret
}

如下图所示:核算周期=1000ms(跨两个格子),now=1300 时 核算出 start=500,end=1000

Sentinel Go-毫秒级统计数据结构揭秘

那么在核算周期内的 pass 数量时,会依据如下条件遍历格子,也就会找到开端时刻是 500 和 1000 的两个格子,那么核算的时候 1000 的这个格子中的数据自然也会被核算到。(当时时刻 1300,在 1000 的这个格子中)


satisfiedBuckets := m.real.ValuesConditional(now, func(ws uint64) bool { 
     return ws >= start && ws <= end 
   })
  • 更新

每次流量经过时都会进行相应的目标存储,在存储时会先获取对应的窗口,然后会依据窗口的开端时刻进行对比,假如过期则进行窗口重置。

如下图:依据窗口开端时刻匹配发现 0 号窗口已过期。

Sentinel Go-毫秒级统计数据结构揭秘

如下图:重置窗口的开端时刻和核算目标。

Sentinel Go-毫秒级统计数据结构揭秘

中心代码:

func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
   // 核算当时时刻对应的窗口下标
   idx := la.calculateTimeIdx(now)
   // 核算当时时刻对应的窗口的开端时刻
   bucketStart := calculateStartTime(now, la.bucketLengthInMs)
   for {
     // 获取旧窗口
      old := la.array.get(idx)
      // 假如旧窗口==nil则初始化(正常不会履行这部分代码)
      if old == nil {
         newWrap := &BucketWrap{
            BucketStart: bucketStart,
            Value:       atomic.Value{},
         }
         newWrap.Value.Store(bg.NewEmptyBucket())
         if la.array.compareAndSet(idx, nil, newWrap) {
            return newWrap, nil
         } else {
            runtime.Gosched()
         }
      // 假如本次核算的开端时刻等于旧窗口的开端时刻,则以为窗口没有过期,直接返回
      } else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
         return old, nil
      //  假如本次核算的开端时刻大于旧窗口的开端时刻,则以为窗口过期尝试重置
      } else if bucketStart > atomic.LoadUint64(&old.BucketStart) {
         if la.updateLock.TryLock() {
            old = bg.ResetBucketTo(old, bucketStart)
            la.updateLock.Unlock()
            return old, nil
         } else {
            runtime.Gosched()
         }
        ......
      } 
}

总结

经过上面的介绍能够了解到在 Sentienl-Go 中完结底层目标的核算代码量并不多,本质是经过“时刻轮”进行目标的数据核算和存储,在时刻轮中学习 slice 的底层完结利用 unsafe.Pointer 和 atomic 合作对时刻轮进行无锁的原子操作,极大的提升了功能。

Sentinel-GO 全体的数据结构图:

Sentinel Go-毫秒级统计数据结构揭秘

作者介绍:

张斌斌(Github账号:binbin0325,公众号:柠檬汁Code),Sentinel-Golang Committer 、ChaosBlade Committer 、 Nacos PMC 、Apache Dubbo-Go Committer。现在主要重视于混沌工程、中间件以及云原生方向。

文章参考:

《golang unsafe.Pointer 运用原则以及 uintptr 躲藏的坑》

louyuting.blog.csdn.net/article/det…