作者:张宇轩,章逸,曾丹

初识 Scheduler

找准定位:分布式使命调度渠道

无论是互联网运用或者企业级运用,都充斥着很多的使命。咱们常常需求一些使命调度体系协助咱们解决问题。跟着微服务化架构的逐步演进,单体架构逐步演变为分布式、微服务架构。在此的布景下,许多原先的单点式使命调度渠道已经不能满意事务体系的需求。所以出现了一些依据分布式的使命调度渠道。

Scheduler 是飞书内的分布式使命调度渠道。分布式使命调度才能主要包括:

  • 分布式:渠道是分布式部署的,各个节点之间能够无状况和无限的水平扩展(确保可扩展);
  • 使命调度:涉及到使命状况办理、使命调度恳求的发送与接纳、详细使命的分配、使命的详细履行;(集群中哪些机器什么时分履行什么使命,所以又需求一个能够感知整个集群运转状况的装备中心);
  • 装备中心:能够感知整个集群的状况、使命信息的注册。

摸清头绪:Scheduler 的结构和中心模块

名词解释:

  • Processor: 编程处理器, 具有必定的编程标准, 用户自定义完成。

  • Executor: 一个 SDK,运转 Processor 的进行容器,与 Scheduler 通信的载体。

  • Job:用户创立的使命,其间包括使命的调度规矩、调度模型、履行器名称等信息。

  • Instance:运转态的Job,每逢Job触发后会生成一个Instance,记载本次履行的调度信息。

  • Task:最小履行单元,不同调度模型的使命产生的Task数量不同。

如何做好分布式任务调度——Scheduler 的一些探索

经过架构图能够发现,Scheduler 主要有以下三个部分:

  1. 调度器 (Scheduler):使命调度中心,担任办理使命的生命周期。接受使命注册,按时精确找出待触发的使命,进行使命拆分下发。找出与之关联的履行器并下发对应使命

  2. 履行器 (Executor):接纳调度使命,并将本身状况上报给调度器

  3. 操控台 (Web 前端):担任装备履行器的信息以及调度使命的装备、使命状况、信息展现

因而,咱们能够用一句话解释清楚 Scheduler 所做的作业,即: 在「指守时刻」「告诉履行器」以「指定办法」履行使命 这句话中包括了三个关键点,也别离代表着 Scheduler 的三个中心模块:

  1. 指守时刻:使命的触发规矩,如:每天早上8点、每周二、每月15号等。触发器模块(Launcher Cron)担任使命触发。
  1. 指定办法:使命的履行办法,如:单播使命-指定一个机器履行;广播使命-指定一切机器履行;分片使命-使命分阶段分批的履行。分配器模块(Assignment Cron)担任使命的履行办法。
  1. 告诉履行器:将使命发送到指定履行器,履行使命。差遣器模块(Dispatcher Cron)担任使命的发送,选用流式通信,调度器以推送的办法将使命发送给履行器。

在一个 Job 的调度周期中,各个模块各司其职,整个流程如下:

如何做好分布式任务调度——Scheduler 的一些探索

具有这三个中心模块后,Scheduler 已具备了老练的使命调度功用。别的,为了添加 Scheduler 的稳定性,有额定两个模块为其保驾护航:

  1. 健康办理模块 (Service Health Cron): 担任办理 Job 的生命周期,检测未正常派发履行的 Job、Instance 和 Task,并将成果上报给运维人员。
  1. 使命进展刷新模块 (Task Cron): 异步更新 Task 状况,流量较高时进行削峰,确保依靠的 mysql 及 redis 不由于流量过高而出现问题。

本篇文章不对 Scheduler 所支撑的守时使命才能作赘述,而是从三个方面(易用性多功用性稳定性)介绍 Scheduler 关于分布式使命调度的考虑和探究:

  1. 「易用性」: 决议了用户是否挑选运用该结构的意愿,一个好的结构必须是易用且快速接入的;
  1. 「多功用性」: 接入方需求多种多样,要站在用户视点想问题,不能闭门造车;
  1. 「稳定性」: 关于分布式使命调度渠道来说,不仅仅局限于本身的稳定性,接入方的稳定性也十分重要。

换位考虑-快速接入

布景:功率至上,时刻是金

以字节跳动内部为例,当时团队想要完成一个守时使命有多种办法:接入字节云的 cronjob 渠道、自己完成一套守时使命结构或者接入第三方守时使命结构。

关于榜首种接入 cronjob 渠道,每一种守时使命都需求注册各自的 psm 和运转时环境(镜像),当使命需求拜访依靠资源如 redis/db 等时,需求各自添加授权。使命代码逻辑有变化时也需求各自晋级,导致开发、办理起来较为杂乱。

关于第二种自己完成一套守时使命结构,不仅全体开发时刻较长,且需求很多时刻进行测试回归来确保结构的稳定性。如果项目内运用到的守时使命较多,那么本身研制一套结构用途也较广泛;若项目中运用到的守时使命较少,则 ROI 较低,许多时分也只是为了造轮子而造轮子。

因而,大多数项目面对添加守时使命的需求时,都会寻求直接接入第三方老练的守时使命结构。关于他们来说,是否易于接入、与现有代码联系是否严密、调试是否方便是很重要的选取目标。

依据这种布景,Scheduler 在规划时就站在了接入方的视点,考虑了如何让接入方能够在最短时刻内以最低本钱接入 Scheduler,完成自己的守时使命。

剖析:站在用户视点想问题

站在接入方视点,对守时使命结构进行选型时最关注的几个点无非是守时使命履行精确性、最高支撑 qps、守时设置多样性、接入本钱这几个。关于前两个目标,Scheduler 现在接入事务方 50+,日均调度使命 20w+ 次,与公司内其他第三方守时使命结构相比也较有竞争性,一起关于后两个关注点,Scheduler 也有自己的风格。

丰厚的调度设置

一般的守时使命结构只支撑 crontab 表达式,例如 0 1 * * * ,代表每天清晨一点履行一次。cronTab 功用强壮,可是若装备杂乱的守时战略,有必定学习本钱,且可读性不高。因而,鉴于这种情况,Scheduler 在 crontab 之上规划了更易读更强壮的守时战略,做到所见即所得。

{
    "startTime":1648029600000,
    "timeZone":"Asia/Shanghai",
    "repeatLevel":"month",
    "repeatInterval":2,
    "repeatDays":[3,5,23]
}
  • startTime: 开始时刻戳,在此之前守时使命不会履行,抵达该时刻后会履行榜初次
  • timeZone: 时区设置,依据当时设置的时区精确派发
  • repeatLevel: 重复等级,现在能够设置按「小时」、「天」、「月」、「周」、「年」以及「作业日」进行重复
  • repeatInterval: 重复距离,代表每隔 $repertInterval $repeatLevel 履行一次
  • repeatDays: 重复天数,重复等级是周或月时生效

因而,该设置所代表的守时刻隔为 每两月的3、5、23号触发一次

触发时刻(北京时刻)
2022-03-23 18:00:00
2022-05-03 18:00:00
2022-05-05 18:00:00
2022-05-23 18:00:00

为什么要做作业日调度

或许有同学注意到,Scheduler 关于重复等级的支撑十分丰厚,不仅能够依照普通的年、月、日等等级进行设置,还能够依照作业日进行重复调度(例如每两个作业日履行一次),这归因在 Scheduler 孵化于字节跳动内部企业服务体系,为比方人事体系、权限体系等 ToB 服务供给守时使命才能。往往 ToB 客户的需求杂乱多变,因而,需求提早具备更多才能,才能更好地服务好 ToB 客户。

Scheduler 在调研接入方需求时,得到了有些客户关于守时提醒这类使命的需求是尽量不要在「非作业日」打扰。所以,Scheduler 决议添加作业日调度选项来适配客户潜在需求,也侧面说明晰 Scheduler 为了让接入方更快更小本钱接入做出的努力。

轻松的运用办法

相信「开箱即用」关于人们在采买比方家电、数码产品时,是十分重要的一个考核目标。而关于对外供给的服务 or 结构,亦是如此。Scheduler 的方针便是让接入方能够在短时刻了解 Scheduler、编写测试代码以及上线守时使命。

专心事务

如果想完成一个守时使命,接入方只需求三步:引进 Scheduler sdk,绑定相应 processor,在 process 接口中完成详细事务逻辑。一起,由于守时使命的完成坐落原代码中,发动装备无需更改,本地测试也较为便捷。一起,在字节跳动环境下,无需新增 psm、授权装备等,尽或许做到了「开箱即用」。

import (
   "context"
   "code.byted.org/apaas/scheduler_sdk/executor"  // 引进 sdk
)
func main() {
    executorSvc, err := executor.NewExecutor(executor.NewDefaultExecutorConfig(), 
    &HelloWorld{}) // 绑定 processor
    if err != nil {
       panic(err)
    }
    if err = executorSvc.Run(); err != nil {
       panic(err)
    }
}
type HelloWorld struct {
   ProcessorApiName string
}
func (h *HelloWorld) GetApiName() string {
   return h.ProcessorApiName
}
// process 中完成详细事务逻辑
func (h *HelloWorld) SimpleProcess(ctx context.Context, tc *executor.TaskContext) (err error) {
   tc.LogInfo(ctx, "hello world")
   return
}

快速运维

没有程序员想主动写出 bug,但问题总是会突然出现。如何在出现问题时快速运维、快速止损,是一切工程师都追求的方针。Scheduler 在这方面做了几种测验。

  1. 报警更直观

用户能够在创立 job 时,能够挑选装备报警机器人,并把 Scheduler 机器人拉入对应报警群组。当检测到对应 job 出现问题时,Scheduler 机器人会把相应报警推送到对应群组,做到实时呼应。

  1. 状况更明晰

现在 task 的相关状况如下,当一个 task 长时刻没有到终态时,依据状况码即可知 task 现在处于什么状况,然后推断是哪一步骤出了问题。

状况码 状况
100 等候触发
101 Ready 就绪态,等候推送
201 推送到 Executor,还未实践履行(使命太多排队)
202 履行中
203 履行超时,逻辑杂乱导致
301 履行成功
302 履行失利
401 Ready 超时,没有 Executor 拉取
402 推送到 Executor 后长时刻未履行
403 履行超时,Executor 宕机导致

而且一些 Scheduler 常见的报错也做了封装,协助快速定位问题,例如

过错码 过错原因
k_sc_ec_000004 找不到使命{{.jobApiName}}
k_sc_ec_100004 找不到使命实例{{.instanceID}}
k_sc_ec_300001 Processor Name 未注册{{.content}}
k_sc_ec_300006 processor({{.content}}) 找不到对应 executor
k_sc_ec_400002 找不到Executor {{.content}}
k_sc_ec_400005 无权限操作

并肩作战-分片使命

布景:使命越多,挑战越大

一个老练的项目中避免不了大型批量使命,比方经过 Excel、csv 或其他数据源批量创立或更新数据,批量使命一般数据量很大,如果依照单实例串行履行,那么不能充分利用核算机资源且一次运转会消费很多时刻,用户体会不友好。

以 Kunlun 举例, 旧阶段的批量使命依靠于音讯行列、Redis 完成,全体分为三大部分:

  1. 解析并校验 Excel,将数据解析成一条条数据,将每条音讯封装成一条音讯发送至音讯行列。
  1. 消费音讯行列,进行创立、更新等操作,并在 Redis 记载全体进展并推送给用户,如果使命失利,会将行数和过错原因一起记载进 redis。
  1. 待一切数据处理结束后,如果 redis 中没有过错数据,则提示用户成功,否则依据过错信息生成 Excel 回来给用户。

如何做好分布式任务调度——Scheduler 的一些探索

运用音讯行列、redis 的守时使命能够提速和优化用户体会,但有以下缺乏:
  1. 维护起来不方便,例如当有新服务需求此类功用时,需求自己再完成一套差不多的结构,所以需求将分片功用托管到第三方服务,而事务方只用专心于详细事务。
  1. 依靠于音讯行列和 Redis 两个外部组件,对两个组件的稳定性要求极高,当其间一个出现问题,都会带来不小的费事。

依据这种布景,Scheduler 丰厚了原本的使命调度才能,补充了分片才能,以满意杂乱繁琐的使命分片处理的需求。

剖析:旧问题,新解法

若打算做出一套贴合事务需求的分片使命结构,需求先了解现阶段的分片使命的完成步骤。

现阶段的分片使命大致能够笼统成3个步骤:

  1. 。获取数据,能够从上传的文件解析数据、从 DB 查询出大批数据或其他数据源。
  1. 。处理数据,聚集于详细事务,如:创立、查询、更新。
  1. 。处理成果,将此次使命运转成果处理成成果报告回来给用户,报告能够为 Excel、一条音讯、一封邮件等。

Scheduler 要做的作业则是替换其间分片、音讯行列、Redis 的功用,做出以下笼统:

type ShardingProcessor interface {
   PreProcess(ctx context.Context, tc *TaskContext) error
   ShardingProcess(ctx context.Context, tc *TaskContext) error
   Notify(ctx context.Context, tc *TaskContext)error
   PostProcess(ctx context.Context, tc *TaskContext) error
}
  1. PreProcess(总) :数据准备阶段。可在此办法中对数据进行额定处理,如核算拓扑联系,定义数据优先级。单机运转。
  1. ShardingProcess(分) :分片处理阶段。实践处理函数,多机运转。分片处理函数,履行批量导入、更新等处理。(入参:Scheduler 对 PreProcess 回来成果的分片子参数)
  1. Notify(阶段式-总) :进展更新处理,每逢进展改变的大小大于设定阈值,则生成一次 Notify 的 Task。Executor 向 Scheduler 汇报子使命进展,Scheduler 核算出全体进展,当总进展产生改变后生成 NotifyTask 告诉 Executor 进行处理。
  1. PostProcess(总) :成果处理阶段。使命履行完毕后可在此函数进行后续处理,单机运转。当一切分片子使命都履行完后,Scheduler 会将子使命的履行成果发送到此函数处理。(入参:每一个子使命 ShardingProcess 后的成果数组)

履行器需求完成ShardingProcessor接口以供调度器进行调度。调度进程如下:

如何做好分布式任务调度——Scheduler 的一些探索

Scheduler 支撑分片使命重点在于丰厚调度模型,提升调度器调度才能,完善履行器履行才能来到达支撑分片使命的目的。

调度侧才能

分批调度的才能

调度侧需求依据使命进展顺次生成 PreTask、ProcessTask、NotifyTask、PostTask 来调度履行器ShardingProcessor中的 PreProcess – Process – Notify – PostProcess 四个办法。单机调度PreProcess,PostProcess,Notify,并行调度 PostProcess,全体调度呈现总-分-总的办法。调度进程如下:

如何做好分布式任务调度——Scheduler 的一些探索

数据拆分的才能

数据拆分即使命分片,指的是将单一使命依照特定的逻辑切分为多个独立的子使命,将其分配到不同的节点履行,以提高使命的履行功率。

而 Scheduler 要处理的使命内部或许存在依靠联系(比方 kunlun 事务中 metadata 批量创立的需求,由于存在 lookup 和 reference 字段等,记载创立之间存在拓扑联系),所以在履行时需求优先级的概念,而不能被简略拆分为独立的子使命。

为了支撑带优先级的使命分片,Scheduler 接纳的分片使命的数据特点如下:

  • 是事务方自定义使命的二维数组;
  • 榜首个维度是使命履行的优先级,坐落同一优先级下的使命并发履行,坐落不同优先级的使命按优先级串行履行;
  • 第二个维度是同一优先级下的自定义使命列表。关于自定义使命的结构,Scheduler 不感知。事务方能够挑选存储使命详情或是主键信息,并自定义处理逻辑,而 Scheduler 只做分片和调度作业。

二维数组能够是下面这样:

[
    [
        // 榜首优先级的使命1,能够是主键
        101,
        // 榜首优先级的使命2,能够是SQL语句
        "Insert into tablename xxx",
        // 榜首优先级的使命3,能够是结构体等等...
        {
            "ID": 999,
            "Name": "zhangsan"
        }
    ],
    [    
        // 第二优先级的使命1、2、...
        102,
        103,
        ...
    ]
]

了解了待分片使命的结构,咱们来评论如何对使命进行分片。比方,分片的数量由什么决议,单个分片上的信息是如何分配的,不同分片又是不同分配到不同的处理器上的…

  1. 分片数的确定

分片数的确定依据以下参数的值:数据量、使命创立时用户指定的单片最大数量单片最小数量,以及实践可用的履行器数量

  1. 分片算法

分片特征值(sharding key)的挑选要遵从的准则应该是依据最常用的拜访办法

由于 Scheduler 分片时并不关心事务数据的结构,所以选用数据数组的下标来作为分片特征值。

由于分片数量确定后,不涉及到由于分片的添加或削减对数据进行 Rehash 的情况,所以无需考虑虚拟节点、一致性哈希等办法进行分片。

这里选用哈希分片的分片算法,原因是既能够均匀分布数据,完成起来也很简略。

  1. 分片的存储和派发

分片完成后,需求给每个分片创立一个 Task,并把分片的数据存储下来。

关于 Task 的派发,依据上面关于分片数的评论,能够得到分片数和 Executor 数的联系:

  • 在数据量合适的情况下(单片最大数量和单片最小数量设置合理时,是最遍及的情况),分片数和 Executor 的数目是一致的;
  • 当数据量很小时,会出现分片数小于 Executor 数;
  • 当数据量很大时,会出现分片数大于 Executor 数,甚至或许是后者的几倍。

为了让各个 Task / 各个分片 能够均匀派发给各个 Executor,也为了避免某个 executor 挂掉时,其他 Executor 不能均匀分摊挂掉的节点原先承当的分片,需求选用合理的分片战略。

在分片时,咱们确保了各分片的数据是尽量均匀分布的,所以从分片到 Executor 的分配办法能够尽或许地简略,选用平均分配的战略即可。关于挂掉的节点所承当的分片,也选用同样的战略派发到存活的 Executors 上即可。

例如:

有 3 个 Executor,分成 9 片,则每个 Executor 分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8]

如果 Executor 1 挂掉,则将 1 的分片平均分配到Executor 2、3: 2=[0,1,3,4,5], 3=[2,6,7,8]

平均分配* :关于不能均分的情况,为了避免靠前的 Executor 总是承当更多的压力,能够依据待分配分片数量的奇偶来决议是升序分配仍是降序分配。

进展告诉的才能

Scheduler 支撑告诉 Executor 使命履行的全体进展

  1. Executor 上报子使命进展至 Scheduler
  1. Scheduler 核算总使命进展
  1. 总使命进展产生变化,则生成 NotifyTask 发送至 Executor

如何做好分布式任务调度——Scheduler 的一些探索

履行侧分批履行的才能

履行侧需求完成并注册 SDK 供给的ShardingProcessor接口,来处理由调度侧发来的多种类型的Task。

PreProcess

预处理办法,能够进行但不限于以下的操作:

  1. 发动参数不符合 Scheduler 规定格式,能够经过 PreProcess 办法进行一次转化。
  1. 数据存在导入优先级,能够在 PreProcess 中编写核算拓扑联系的办法。

如果不需求预处理,可直接在办法内 return,分片时数据运用发动时的 Data

func (s *ShardingProcessor) PreProcess(ctx context.Context, tc taskContext) error{
    oldData := tc.GetData()
    // 用户事务, 数据处理
    newData := Transform(oldData)
    // 回来带拓扑排序的数据
    tc.SetResult(newData)
    return nil
}

ShardingProcess

分片处理函数,主要是进行数据更新、创立操作。ShardingProcess 的入参是切分后的数组([]interface{})。Executor 需求对参数进行两部分额定处理:

  1. []interface{}中的interface{}断语成详细struct{}
  1. 处理后。核算当时子使命履行进展,进行上报;如果省掉上报,服务端以分片粒度生成 NotifyTask告诉履行器。
func (s *ShardingProcessor) ShardingProcess(ctx context.Context, tc taskContext) error{
   taskData := tc.GetData()
   for _, data := range taskData{
       // 用户事务, 数据处理
   }
   tc.SetResult(...)
   return
}

PostProcess

接受一切分片处理成果,进行后续处理,如生成过错文件。

func (s *ShardingProcessor) PostProcess(ctx context.Context, tc taskContext) error{
    taskData := tc.GetData()
    for _, result := range taskData{
        // do something
    }
    // 用户事务
    tc.SetResult(...)
}

Notify

供给给子使命上报的才能,Scheduler 会依据一切子使命上报成果核算进展,告诉 Executor,告诉粒度为数据条数。如果接入方不主动上报子使命进展,Scheduler 会依据子使命完成度进行告诉,告诉粒度为分片粒度。

分片使命流程

如何做好分布式任务调度——Scheduler 的一些探索

削峰填谷-流量操控

布景:供给才能,而非施加压力

在 Scheduler 规划初期时,更多的是把注意力放在了如何能够快速、精确、低推迟的触发使命,为此还多次优化了触发器、分配器、差遣器三大模块的轮询逻辑,可是疏忽了使命量过大时下流能否抗住流量的问题。

如果 Scheduler 在调度时无法精确感知下流压力,那么很简略将下流打挂,如:在守时使命初次上线时,由于 kunlun 的装包机制导致数千个运用下装备了同样的守时使命,虽然一个包内的数十个守时使命触发时刻涣散,可是运用包之间的同一个使命触发时刻相同,导致下流需求在同一时刻处理数千个使命,再加上使命的处理流程还会经过音讯中间件进行分散,导致数据库在使命履行阶段一向处理低 IDLE 阶段。

剖析:流量追寻,剥茧抽丝

现在大部分后端服务,经过剖析使命的流量走向,能够大致承认每一条使命在履行进程中不管分散还对错分散流量都会走向 DB,流量图大致如图。

如何做好分布式任务调度——Scheduler 的一些探索

使命的流量终究打到了 DB,所以流量操控的方针就更加明晰:对 DB 的流量操控。

需求对 DB 进行流量操控,那么就要设定合理的目标,理论上,只需目标选用的满足合理,就能严厉、精确的操控流量,目标则需求具备以下条件:

  1. 实时性。能够按时反响数据库健康状况。
  1. 权威性。能够精确反响数据库健康状况。

优先级: 实时性 > 权威性。当一个目标的实时性不够高,那么它的权威性就不再有价值。

只需求实时监听着 DB 的目标,来判别使命是立刻履行,仍是推迟履行就能有效的维护 DB。

目标挑选

  1. 消费 metrics 的监控点,关于数据库的打点信息十分全面,能够十分容易的获取到数据库宿主机的 CPU、内存或数据库本身的连接数、查询数等目标,这些目标的权威性毋庸置疑,可是 metrics 经过将目标搜集到本地署理,署理每 30s 做一次聚合发送至服务端,其时效性太差。
  1. 数据库不可用要素为:很多使命触发 -> DB 拜访流量增高 -> CPU idle 下降 -> 数据库不可用。形成 CPU idle 下降的要素为 DB 流量增高,能够将 DB 的流量作为目标进行流量操控,缺点是需求自己搜集目标。

目标搜集

目标规模

反映 DB 压力较为直接的目标是 cpu idle,但考虑到服务部署往往多实例以及 cpu idle 搜集难度大的情况,以近似目标来代替。另一方面,经过历史数据剖析,DB 流量与 cpu idle 有必定的关联,因而以 DB 流量作为 DB 压力目标。

如何做好分布式任务调度——Scheduler 的一些探索

数据存储

参考限流的完成方案,选用独自的 Redis 存储流量数据,以 1s 为时刻窗口作为 Redis key,每个时刻窗口的流量作为 Redis value,每次产生 DB 操作时更新流量数据。体系中存在多个 DB,每个 DB 独自计算,在 Redis key 中加入 db 信息。Redis key 设置10s过期时刻,查询时依据过去3个窗口的加权平均(80%/15%/5%)作为当时流量,以处理窗口交界处的突发流量。

搜集办法

现在 DB 流量已有 metrics 监控数据,但由于 metrics 会在本地聚合 30s 数据后上报,至少会有 30s的推迟。而形成 DB 压力大的守时使命多为短期集中触发,运用 metrics 数据会有感知不及时的问题,因而需求额定搜集数据。参考 DB metrics 数据搜集的办法,经过 Gorm 的 callback 机制刺进详细的搜集逻辑,削减对事务代码的侵入。

func SetMonitorCallBack(db *gorm.DB) {
   db.Callback().Create().Before("gorm:before_create").Register("metric:before_create", beforeCreateCallback)
   db.Callback().Delete().Before("gorm:before_delete").Register("metric:before_delete", beforeDeleteCallback)
   ...
}
func beforeCreateCallback(scope *gorm.Scope) {
    beforeCallback(scope, "create")
}
func beforeCallback(scope *gorm.Scope, method string) {
    ...
}

在搜集逻辑上,需求考虑以下几个问题:

  • 功用:callback 中需求尽量削减推迟,优先运用异步的办法上报数据。运用 channel 充当行列,callback 中将数据写入 channel,当 channel 容量满时丢掉数据,避免堵塞。还有异步协程从 channel 中取数据上报。统筹时效性和网络开支,在上报前预先在本地以 100ms 窗口做聚合。
type MetricType int8
const (
  QueryCount MetricType = 1
)
type DBMetric struct {
    DBName   string
    DBMethod string
    Type MetricType
    Timestamp int64
    Value     interface{}
}
// callback中将metric数据写入channel
func beforeCallback(scope *gorm.Scope, method string) {
    dbName := getStringValueFromCtx(scope.DB().Ctx, CtxVariableDBName)
    curMs := time.Now().UnixNano()/int64(time.Millisecond)
    metric := DBMetric{dbName, method, QueryCount, curMs, 1}
    select {
    case ch <- metric:
    default:
    // channel is full, ignore this metric
    }
}
// 异步上报,在resource_sdk的Init()中依据装备判别是否发动此协程
func metricAgent() {
    windowSize := 100
    // window time -> (metric key(dbName|dbMethod|type) -> metric)
    aggrMetrics := map[int64]map[string]DBMetric
    timer := time.NewTicker(windowSize)
    defer func() {
       timer.Stop()
    }()
    for {
       select {
       case msg := <-ch:
           curWindow := curMs/windowSize
           更新aggrMetrics中curWindow对应的metric(对queryCount来说是加1case <-timer.C:
           将aggrMetrics中key+windowSize<=curTime的数据上报并清除
      }
    }
}
  • 运维本钱:搜集逻辑会运转在各个服务上,考虑到后续会搜集更多的目标,直接上报 Redis 需求给各个服务开通读写权限,运维办理本钱较高。依据此,运用额定的服务来办理目标数据,接纳上报的目标数据存入 Redis,并经过接口的办法供给查询服务。目标存放在更加聚集在 DB 资源的 resource 服务中,在 resource 服务中经过添加接口的办法完成目标数据的办理功用,一起,为了不影响 resource 原有事务的稳定性,运用独自的集群供给服务。

如何做好分布式任务调度——Scheduler 的一些探索

Scheduler 调度反应

流量阈值约束

Scheduler 调度速率与 DB 负载之间的联系较为杂乱,本期选用简略的阈值反应机制,设置 DB 流量阈值,当流量超出阈值时,中止 Scheduler 当时周期调度。依据历史数据,设置阈值为 5K。

当流量未超出阈值时,不能预估使命对 DB 流量的影响,选用简略战略对使命数进行约束:

使命数 = max((DB流量阈值 – DB当时流量)* 100 / DB 流量阈值, 0)

DB路由

现在 Kunlun 的 DB 资源依据租户进行分配,不同租户的数据和流量会落在不同的 DB 上。Scheduler 会记载 Job 地点租户,所以在调度时,需求依据租户查找真实的 DB 资源,经过 DB 目标的健康状况来决议是否差遣使命:

  • 查询 Job 地点租户分配的 pg 资源标识
  • 依据 pg 资源标识去 Redis 查询对应的流量数据

调度操控流程

如何做好分布式任务调度——Scheduler 的一些探索

– END –