kube-scheduler 在k8s集群中担任pod的调度。他首要的职责是监听pod 资源,对没有绑定node 的pod,依据特定的调度算法与战略挑选集群中最优的work node去运转这个pod。
本文依据kube-scheduler v1.21.1版本,对kube-scheduler的运转机制结合代码做一个简略的解读。
基本原理
kube-scheduler在规划上并不杂乱,为pod获取最优节点首要分为2个阶段3个过程
调度阶段
- predicates,为带调度的pod过滤集群中不适宜运转的节点。kube-scheduler供给了一些过滤战略,多个战略可以组合运用。
- priority,经过predicate过滤后, priority担任对剩余的节点「评分」,比如剩余资源较多的节点会取得较高的评分,契合Pod NodeAffinityPriority的节点会取得较高的评分。 priority阶段只会回来一个节点,当最高分的节点有多个时将依照round-robin的办法挑选Node, kube-scheduler 内置了一些「评分战略」,相同也可以组合运用。
绑定阶段
- bind,为pod绑定提名节点,发送请求到apiserver。
扩展调度战略
当kube-scheduler默许加载的过滤战略与优先级算法不满意咱们的需求时,kube-scheduler也供给了接口让咱们「扩展」调度战略。
scheduler policy
- 经过kube-scheduler的装备文件,挑选运用哪些过滤战略和评分算法,只答应运用k8s 已完结好的过滤战略和评分战略
- 在policy装备中界说scheduler extender,经过webhook来扩展调度战略,开发者供给两个接口别离用来处理”过滤”和“评分”。 一个接口用来处理kube-scheduler现已过滤后的节点列表。 一个接口用来处理kube-scheduler现已评分后的节点列表。
scheduler framework
- pod的调度流程以插件的办法完结,原有的过滤战略和评分战略悉数都以in tree plugin的办法完结。对过滤、评分战略的扩展以out of tree plugin办法完结。 整个调度流程供给丰厚的扩展点,每个扩展点绑定plugin,在调度流程走到相应的扩展点时,按次序履行该扩展点的plugin。开发者可以向kube-scheduler注册插件,以介入pod的调度绑定流程。
- kube-scheduler v1.15版提出,v1.18版release,后续扩展调度战略的主流计划。
Scheduler Framework
调度系统的在调度时的意图往往是动态的,或许是本钱优先、质量优先、最大资源利用率优先等等,这与事务场景有关。
正是由于调度系统的调度战略是与事务场景相相关的,很难用一套调度战略满意一切事务场景。越来越多的调度战略加入到kube-scheduler中,使得kube-scheduler的调度逻辑越来越杂乱,杂乱的调度器是难以维护的,前期的kube-scheduler尽管也具备了扩展才能使得开发者可认为特定的事务场景规划调度战略,但受到如下方面的约束。
- 扩展点的数量只要两个:过滤后、评分后。扩展程序只能在kube-scheduler过滤后与评分后介入调度流程。
- kube-scheduler与扩展程序的程序运用HTTP通讯,每一次通讯都规划json的序列化与反序列化,功能较差。
- 扩展程序作为一个独立的程序,要么只处理kube-scheduler传递过来的数据,要么自行构建一套k8s资源缓存,存在额定的资源开销
- 扩展程序无法感知被调度的资源当时处于什么状况。假如一个pod被kube-scheduler断定为不行调度,扩展程序是无法感知的。
由于这些约束无法构建一个高功能、多功能的调度器,为此社区提出scheduler framework来处理kube-scheduler的扩展与功能问题。使得调度程序的核心更简略,方便维护。
接下来咱们来聊一聊scheduler framework界说的一些**「方针」**别离表明什么,承当什么效果。
ExtensionPoints(扩展点)
扩展点表明在调度绑定周期中的一个“阶段”,kube-scheduler会在每一个阶段履行做一些作业,以完结pod的调度绑定。Scheduler Frameworker的作业流依照以下次序,履行相应扩展点的插件。每一个扩展点可以绑定多个插件,一个扩展点的每一个插件都需求回来处理结果,假如处理结果为过错,则该pod将直接打回「待调度行列」等候下一次调度。
Predicates
从待调度的pod行列中,拿出一个pod开端完结调度绑定逻辑时,将依次经过预界说的「扩展点」,扩展点关于开发者来说是一个接口,完结这个接口的方针被称之为「插件」。
一个插件可以完结多个扩展点接口。
多个插件完结同一个扩展点接口,Schduler会按序履行每一个插件。
- 在走到post filter扩展点时,按次序履行相应的plugin,假如有一个插件回来成功或失利,那么其他的plugin都不会履行。
- 在走到bind阶段时,按次序履行相应的plugin,假如有一个plugin履行了绑定,那么其他plugin都不会履行
CycleState
每一个pod的调度绑定流程,都会相关一个CycleState方针,它用于存储当时pod整个调度绑定流程的一切数据,在每一个扩展点中,一切的plugin都可以拿到这个方针,可以从该方针读取或写入一些必要的信息。
CycleState是对一切plugin共享的。
扩展点详解
在每一个扩展点,插件都可以介入进来履行一些“操作”,下面临每一个扩展点做一个简略的阐明。
- queueSort 待调度行列的重排序,plugin可以在这个阶段把优先级比较高的pod放到行列的前面,使得他们可以优先调度。
- PreFilter ,过滤之前的预处理,查看pod,可以将pod标记为不行调度。这个扩展点framework是不会供给node信息的,只能对pod做查看。尽管可以经过ClientSet去拿一切的node信息,但不推荐,在大规划k8s环境下,会影响功率。
- Filter ,查看node是否可以运转待调度的pod。插件在这个扩展点拿到的是node信息,不是node列表。插件仅需查看framework供给的node是否可以运转当时的pod
- PostFilter,在filter扩展点,无法找到一个适宜的node,被断定为无法调度时,就会触发该扩展点插件履行。内置的插件经过这个扩展点,完结了高优先级pod资源抢占机制。
- PreScore,前置评分首要用于对pod和候选node列表做一个查看,并将处理结果放到CycleState中,以方便在Score扩展为评分逻辑供给一些辅助信息。
- Score, 给filter后产生的可用节点列表打分,回来可用node列表及其评分。
- NormalizeScore,他是Score扩展点接口界说的一个办法,在评分完结后,还可以批改每个node分数。它必需求保证评分的范围在0-100以内。
- framework在拿到评分结果后会挑选评分最高的节点回来。
- Reserve,完结该扩展点的插件,可为pod预留相关资源,如pod相关了pvc资源,那么在kube-scheduler的缓存中提前为pod绑定pvc与pv资源。
- Reserve扩展点需求完结reserve、unreserve办法,别离是履行资源预留与回滚资源预留。
- kube-scheduler的VolumeBinding插件在这个扩展点完结了pod相关的pvc、pv资源预留。
- Permit,可以阻止或延迟pod绑定node
- PreBind,绑定前履行一些使命,如将pod的相关资源的绑定耐久化至k8s。假如PreBind失利会履行Reserve扩展点的Unreserve办法回滚pod的相关资源绑定。
- Bind, 将pod和node绑定,完结bind扩展点的插件按次序调用,只要有一个插件完结绑定,后续插件都会悉数跳过。Bind失利会履行Reserve扩展点的Unreserve办法回滚pod的相关资源绑定。
- PostBind, 绑定后履行一些逻辑,可以做一些整理作业。
任意一个扩展点插件回来过错都会中断该pod的调度,并回来到调度行列
Multi Scheduler
Scheduler Framework的扩展才能远远超越之前Scheduler Policy,一起依据Scheduler Framework引入了新的Scheduler装备:KubeSchedulerProfile。
在KubeSchedulerProfile中可以界说每一个扩展点运用哪些插件,禁用哪些插件。
在KubeSchedulerProfile中可以界说多个Profile。一个Profile表明一个调度器,在kube-scheduler初始化时会读取KubeSchedulerProfile创立多个framework方针,kube scheduler 经过pod.spec.schedulerName,找到对应的framework方针,运用该方针为pod履行调度绑定流程
如下界说了两个调度器:default-scheduler、no-scoring-scheduler。
apiVersion: kubescheduler.config.k8s.io/v1beta2kind: KubeSchedulerConfigurationprofiles: - schedulerName: default-scheduler - schedulerName: no-scoring-scheduler plugins: preScore: disabled: - name: '*' score: disabled: - name: '*'
依据Scheduler Framework的调度绑定完结
调度行列的规划
kube-scheduler的pod调度行列由PriorityQueue方针完结。他最核心的数据结构首要是3个子行列
activeQ
activeQ子行列包括正在等候调度的 pod,由「堆」数据结构完结。queueSort扩展点的插件可以对该行列中的pod 做排序,以完结高优先级的pod优先调度
kube-scheduler内置的queueSort扩展点插件 queueSortPlugin会将activeQ行列中的pod,依照优先级与创立时刻排序。将高优先级、创立时刻早的pod放在行列头部。
一起activeQ中的pod 被拿出来时,会相关一个SchedulingCycle。他是调度行列里边的一个计数器,每次pod被拿出来就加1。
podBackoffQ
backoff 是并发编程中常见的一种机制,就是假如一个使命重复履行,但仍旧失利,则会依照失利的次数进步重试等候时刻,防止频繁重试浪费资源。
运转失利的pod都会放到backoff行列中,并在一段时刻后移至activeQ中,这个”一段时刻“详细是多久,则取决于它的失利次数,最大不会超越10s(DefaultPodMaxBackoffDuration)
当你调查k8s集群里一个 一向crash的pod,他的status会变成crashLoopBackoff,像这种pod都会进入podBackoffQ行列
unschedulableQ
在调度时,被断定为无法调度的pod,都将寄存至该行列。 无法调度是指pod的要求,当时集群无法满意,如pod的cpu要求高,当时集群一切节点都不满意。此刻pod就会被断定为无法调度进入unschedulableQ行列。
调度行列的运用
调度行列在运转期间:
每隔1s查看podBackOffQ行列中是否有pod可以放进activeQ
每隔30s把unschedulableQ中长时刻(默许60s)处于不行调度的pod移至backoff行列。
每隔0s,scheduler方针从activeQ获取待调度的pod
当一个pod在调度周期被断定为不行调度进入到unschedulerQ行列后,假如集群资源发生了改变,比如新增了node,删去了pod等。一个不行调度的pod就有了调度成功的或许性。它将等候60s后,定时使命触发将这个pod移动至backoffQ行列,并再次从backoffQ行列移动至activeQ行列,准备下一次调度。
这姿态关于pod的调度会是一个功率的很低的作业,由于他需求的时刻太长了。为了进步功率,scheduler方针监听pod与node资源发生改变时,都会调用PriorityQueue方针的movePodsToActiveOrBackoffQueue办法。该办法会将不行调度的pod会被从头放进activeQ或者backoffQ,一起moveRequestCycle会设置为当时的schedulingCycle。
至于详细移动到哪个行列,则依据moveRequestCycle是否大于等于SchedulingCycle,假如大于等于则放到backOff行列,不然放到unschedulerQ行列。结合前面的剖析,只要资源发生改变时,moveRequestCycle才有或许大于等于当时的SchedulingCycle。那么就阐明,在判别pod无法调度后,k8s集群环境现已发生了改变。那么此刻断定无法调的pod在集群改变后,还是有或许可以调度的,所以放到backOff行列中,是为了让他赶快建议调度重试。
终究作业流程如下:
修改
调度主流程
- scheduler方针从调度行列的activeQ行列取待调度的pod
- scheduler方针依据pod的schedule Name 字段获取对应的framework方针。
- scheduler方针调用generic scheduler方针履行framework的 queue sort、prefilter、filter、prescore、score扩展点。
- 若generic scheduler在履行时,回来过错
- 过错类型为FitError,表明过滤阶段失利,触发抢占机制,履行framework的postfilter扩展点
- 将pod从头加入调度行列
- scheduler 方针履行assume,对拿到提名节点的pod在缓存中履行节点绑定操作,这样绑定流程可以异步去履行。 一起,在kube-scheduler的pod缓存中,这个pod是现已在node上正常运转的,那么在后续pod调度时,关于节点资源的评价,也是包容这个pod现已占用节点上的一部分资源的。
- scheduler方针履行framework的Reserve、Permit扩展点。
- goroutine异步履行framework的Prebind、Bind、PostBind扩展点。
调度周期是同步履行的,完结调度周期的作业流后,会经过goroutine异步履行绑定周期,这样可以无需等候绑定结果,立刻为下一个pod开启调度。
大规划K8s集群调度瓶颈
当k8s 集群节点规划比较大时,假如每一个pod都需求遍历一切node来断定哪个node是”适宜“的。那么这个调度流程功率会变的特别低。
kube-scheduler会操控参加调度的node数量来进步调度功率,在默许情况下,假如k8s节点的数量少于100个,那么一切的节点都会参加调度。不然,将依据设置的节点百分比挑选部分节点参加调度。
操控节点数量
- 调用prefilter 扩展点的插件,查看pod是否可以被调度,假如任一PreFilter插件回来过错,那么pod将打回待调度行列
- 遍历一切节点,并调用Filter扩展点的插件,并记录适宜该pod节点的数量,一旦达到数量约束或plugin的filter办法回来失利,将中止遍历。
- 调用filter扩展点插件时,或许会履行两遍filter,详细的原因和抢占功能有关,文章末尾解说原因
- 在寻觅适宜pod的node列表时,将开启16个(默许16个)goroutine 并行筛选。每个goroutine会各自担任一切节点中的一部分。
- 记录中止遍历时,node列表遍历到什么方位。鄙人一个pod的调度周期,将从这个方位开端遍历node列表。这样可以保证集群中每一个节点的都有公平的被调度机会。
代码剖析如下:
核算节点数量约束
func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) { // 关于节点数量少于100的,悉数节点参加调度 // percentageOfNodesToScore是集群一切节点参加数量的百分比,假如设置为100,就是一切节点都参加调度 if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 { return numAllNodes } adaptivePercentage := g.percentageOfNodesToScore//当numAllNodes大于100时,且装备的百分比小于等于0,那么这儿需求核算出一个百分比// 核算公式:百分比 = 50 - (总节点数)/125 if adaptivePercentage <= 0 { basePercentageOfNodesToScore := int32(50) adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125 if adaptivePercentage < minFeasibleNodesPercentageToFind { adaptivePercentage = minFeasibleNodesPercentageToFind } }
numNodes = numAllNodes * adaptivePercentage / 100 if numNodes < minFeasibleNodesToFind { return minFeasibleNodesToFind }
return numNodes}
获取可调度节点
// findNodesThatPassFilters finds the nodes that fit the filter plugins.func (g *genericScheduler) findNodesThatPassFilters( ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod, diagnosis framework.Diagnosis, nodes []*framework.NodeInfo) ([]*v1.Node, error) { // 核算node数量约束 numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))
// 寄存适宜的node列表 feasibleNodes := make([]*v1.Node, numNodesToFind) // 假如没有插件完结Filter扩展点,就直接截取一切node列表中的一段 // 从上一次中止查找的node 后边开端截 if !fwk.HasFilterPlugins() { length := len(nodes) for i := range feasibleNodes { feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node() } g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length return feasibleNodes, nil } errCh := parallelize.NewErrorChannel() var statusesLock sync.Mutex var feasibleNodesLen int32 ctx, cancel := context.WithCancel(ctx) // 履行一切插件的Filter, checkNode := func(i int) { // 从上一个调度周期中脱离的节点开端查看节点是否适宜,履行一切插件的filter nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)] status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo) if status.Code() == framework.Error { errCh.SendErrorWithCancel(status.AsError(), cancel) return } // 假如这个节点适宜,那么就把他放到feasibleNodes列表中 if status.IsSuccess() { length := atomic.AddInt32(&feasibleNodesLen, 1) if length > numNodesToFind { cancel() atomic.AddInt32(&feasibleNodesLen, -1) } else { feasibleNodes[length-1] = nodeInfo.Node() } } else { statusesLock.Lock() diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = status diagnosis.UnschedulablePlugins.Insert(status.FailedPlugin()) statusesLock.Unlock() } }
beginCheckNode := time.Now() statusCode := framework.Success defer func() { // We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins // function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle. // Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod. metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode)) }()
// 开协程履行filter,直到数量达到约束 fwk.Parallelizer().Until(ctx, len(nodes), checkNode) // 设置下次开端遍历node的方位 processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap) g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)
feasibleNodes = feasibleNodes[:feasibleNodesLen] if err := errCh.ReceiveError(); err != nil { statusCode = framework.Error return nil, err } return feasibleNodes, nil}
抢走低优先级pod的资源
kube-scheduler为了保证高优先级的pod可以优先调度,在pod被断定为无法调度时,并不会直接将其放到unschedulerQ行列,而是查看有没有优先级比当时pod低的的pod可以抢占。假如有则履行抢占流程。
pod的priority用来表明pod的优先级,假如没有设置这个字段,那么pod的优先级就是0。k8s操控平面的一切组件悉数都是高优先级,其优先级都被设置为2000001000。
”抢占“的逻辑由PostFilter扩展点来完结,后边咱们首要剖析kube-scheduler内置的DefaultPreemption插件完结PostFilter时,是如何完结抢占逻辑的。
func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { defer func() { metrics.PreemptionAttempts.Inc() }()
nnn, status := pl.preempt(ctx, state, pod, m) if !status.IsSuccess() { return nil, status } // This happens when the pod is not eligible for preemption or extenders filtered all candidates. if nnn == "" { return nil, framework.NewStatus(framework.Unschedulable) } return &framework.PostFilterResult{NominatedNodeName: nnn}, framework.NewStatus(framework.Success)}
DefaultPreemption插件首先会去获取node列表,然后获取最新的要履行抢占的pod信息,接着分下面几步履行抢占:
- 查看是pod否可实施抢占。调用PodEligibleToPreemptOthers办法,查看当时pod是否可以进行抢占,假如当时的pod现已抢占了一个node节点且该节点有pod正在履行高雅退出,那么不应该履行抢占。
- 查找可抢占的节点。调用FindCandidates找到一切node中能被抢占的node节点,并回来候选node列表以及node节点中需求被删去的pod(献身者);
- 寻觅最佳抢占方针。调用SelectCandidate办法在一切候选列表中找出最适宜的node节点履行抢占;
- 履行抢占。调用PrepareCandidate办法删去被抢占的node节点中victim pod(献身者),以及清除献身者的NominatedNodeName字段信息;
- 献身者pod资源发生改变,被kube-sheculer监听到,从头加入调度行列,等候从头调度。
PodEligibleToPreemptOthers
这个办法会查看该pod是否现已抢占过其他node节点,假如是的话就遍历这个节点上的一切pod方针,假如发现节点上有pod资源方针的优先级小于待调度pod资源方针并处于terminating状况,阐明这个node正在履行低优先级pod驱逐,现已有正在删去的pod,等候删去成功后,开释资源,高优先级的pod就会占用这个node。
func PodEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister, nominatedNodeStatus *framework.Status) bool { if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever { klog.V(5).InfoS("Pod is not eligible for preemption because it has a preemptionPolicy of Never", "pod", klog.KObj(pod)) return false } nomNodeName := pod.Status.NominatedNodeName
// 查看pod是否现已有提名node,假如有那么阐明现已履行过抢占 if len(nomNodeName) > 0 { if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable { return true } // 获取抢占的node if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil { // 查看这个node中是否存在正处于terminating状况的pod,且优先级比当时pod低 podPriority := corev1helpers.PodPriority(pod) for _, p := range nodeInfo.Pods { if p.Pod.DeletionTimestamp != nil && corev1helpers.PodPriority(p.Pod) < podPriority { // There is a terminating pod on the nominated node. return false } } } } return true}
FindCandidates
FindCandidates办法首先会获取node列表,然后调用nodesWherePreemptionMightHelp办法来找出predicates 阶段失利但是经过抢占也许可以调度成功的nodes,由于并不是一切的node都可以经过抢占来调度成功。最后调用dryRunPreemption办法来获取契合条件的node节点。
func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, *framework.Status) { allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List() if err != nil { return nil, nil, framework.AsStatus(err) } if len(allNodes) == 0 { return nil, nil, framework.NewStatus(framework.Error, "no nodes available") } // 找到可以占用的node potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m) if len(potentialNodes) == 0 { klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod)) // In this case, we should clean-up any existing nominated node name of the pod. // 假如当时pod不存在可以抢占的node,那么就把pod的提名node信息给删掉 if err := util.ClearNominatedNodeName(pl.fh.ClientSet(), pod); err != nil { klog.ErrorS(err, "cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod)) // We do not return as this error is not critical. } return nil, unschedulableNodeStatus, nil } // 获取PDB方针,PDB可以约束一起终端的pod资源方针的数量,以保证集群的高可用性 pdbs, err := getPodDisruptionBudgets(pl.pdbLister) if err != nil { return nil, nil, framework.AsStatus(err) }
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(potentialNodes))) if klog.V(5).Enabled() { var sample []string for i := offset; i < offset+10 && i < int32(len(potentialNodes)); i++ { sample = append(sample, potentialNodes[i].Node().Name) } klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates) } // 寻觅契合条件的node,并封装成candidate数组回来 candidates, nodeStatuses := dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates) for node, status := range unschedulableNodeStatus { nodeStatuses[node] = status } return candidates, nodeStatuses, nil}
SelectCandidate
这个办法里边会调用candidatesToVictimsMap办法做一个node name和victims映射map,然后调用pickOneNodeForPreemption履行首要过滤逻辑。
func SelectCandidate(candidates []Candidate) Candidate { // 假如没有候选node,就直接回来nil if len(candidates) == 0 { return nil } // 假如只要一个候选node,就直接回来该node if len(candidates) == 1 { return candidates[0] } // 拿到一切候选node里边,需求“献身”的pod 映射关系 victimsMap := candidatesToVictimsMap(candidates) // 挑选一个候选node candidateNode := pickOneNodeForPreemption(victimsMap) // Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree // preemption plugins that exercise different candidates on the same nominated node. if victims := victimsMap[candidateNode]; victims != nil { return &candidate{ victims: victims, name: candidateNode, } }
// We shouldn't reach here. klog.ErrorS(errors.New("no candidate selected"), "should not reach here", "candidates", candidates) // To not break the whole flow, return the first candidate. return candidates[0]}
PrepareCandidate
至此拿到了候选node,以及该node上需求献身的pod。
- 删去需求献身的pod。
- 找到一切提名这个node的 pod,且优先级比当时pod低的。
- 去除这些pod的Nominated信息。并且将这些pod移至activeQ行列,让他们从头调度
func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod, pluginName string) *framework.Status { // 删去候选node上 需求献身的pod for _, victim := range c.Victims().Pods { // If the victim is a WaitingPod, send a reject message to the PermitPlugin. // Otherwise we should delete the victim. if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject(pluginName, "preempted") } else if err := util.DeletePod(cs, victim); err != nil { klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) return framework.AsStatus(err) } fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, c.Name()) } metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
// 找到优先级比当时pod低,且也提名了该候选node的pod。 nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name()) // 删去这些pod的Nominate信息,并移至activeQ行列 if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil { klog.ErrorS(err, "cannot clear 'NominatedNodeName' field") // We do not return as this error is not critical. }
return nil}
抢占总结
高优先级的pod进行”抢占“时,会将pod的nominatedNodeName 字段,设置为被抢占的 Node 的名字。然后,鄙人一周期中决议是不是要运转在被抢占的节点上,当这个Pod在等候的时分,假如有其他更高优先级的 Pod 也要抢占同一个节点,那么调度器就会清空「被抢占者」的spec.nominatedNodeName 字段,然后答应更高优先级的抢占者履行抢占。这也使得「被抢占者」本身也有机会去从头抢占其他节点。
抢占者并不会立刻被调度到被抢占的 node 上,调度器只会将抢占者的 status.nominatedNodeName 字段设置为被抢占的 node 的名字。然后,抢占者会从头进入下一个调度周期,在新的调度周期里来决议是不是要运转在被抢占的节点上,当然,即便鄙人一个调度周期,调度器也不会保证抢占者一定会运转在被抢占的节点上。这样规划的一个重要原因是调度器只会经过标准的 DELETE API 来删去被抢占的 pod,所以,这些 pod 必然是有一定的“高雅退出”时刻(默许是 30s)的。而在这段时刻里,其他的节点也是有或许变成可调度的,或者直接有新的节点被添加到这个集群中来。
调度流程的二次过滤
在调度流程中Filter扩展点或许会履行两次,其首要意图是为了考虑高优先级的pod抢占了node的场景。
第一次会调用addNominatedPods办法将调度行列中找到node上优先级大于或等于当时pod的 pod集合加入到nodeInfo方针中,然后履行FilterPlugin列表。第二次则直接履行FilterPlugins列表。之所以第一次要这么做,是由于在pod抢占node的逻辑中,优先级高的pod先抢占node,抢占成功后将pod.Status.NominatedNodeName字段设置成当时的node,设置完结后scheduler就跑去履行下一个pod的调度逻辑了,这时pod很或许还没有真正在node上面跑起来。所以Scheduler的缓存中其实并没有将这类pod的信息,所以在调度当时pod的时分,会受这些高优先级pod的影响(pod和pod之间有pod亲和性、反亲和性等依靠关系),所以要先假定这类高优先级的pod现已在这个node中跑起来了,现已占用了节点上的一部分资源,这样当时pod的调度是以「节点最少剩下多少资源」来履行filter扩展点。
为了保证万无一失(万一这些高优先级的pod终究没在这个node跑起来),还得把这些高优先的pod排除去再履行一次filter扩展点。 这样,无论其它高优先级的pod在不在这个node上,这个pod都能保证无冲突地调度在这些node上面。
代码完结如下:
func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status { var status *framework.Status
podsAdded := false for i := 0; i < 2; i++ { stateToUse := state nodeInfoToUse := info if i == 0 { var err error podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info) if err != nil { return framework.AsStatus(err) } } else if !podsAdded || !status.IsSuccess() { break }
statusMap := f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) status = statusMap.Merge() if !status.IsSuccess() && !status.IsUnschedulable() { return status } }
return status}
结束语
得益于scheduler framework的规划,一切的调度战略都以插件的形式注入kube-scheduler,并协同作业完结pod的调度绑定。调度战略的完结分散在各个插件中,本文首要关注kube-scheduler一些重要插件的调度逻辑完结,起到抛砖引玉的效果。
引用
github.com/kubernetes/…
draveness.me/system-desi…
zhuanlan.zhihu.com/p/33823266…
www.infoq.cn/article/lYU…