<aside> ⏰ 本文主要介绍调度器调度一个 Pod 的流程就行源码分析。
</aside>
前面我们分析了 kube-scheduler 组件如何接收命令行参数,用传递的参数构造一个 Scheduler 对象,最终启动了调度器。调度器启动后就可以开始为未调度的 Pod 进行调度操作了,本文主要来分析调度器是如何对一个 Pod 进行调度操作的。
调度器启动后最终是调用 Scheduler 下面的 Run() 函数来开始调度 Pod,如下所示代码:
// pkg/scheduler/scheduler.go
// 等待 cache 同步完成,然后开始调度
func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
sched.SchedulingQueue.Run()
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}
首先会等待所有的 cache 同步完成,然后开始执行 SchedulingQueue
的 Run() 函数,SchedulingQueue
是一个队列接口,用于存储待调度的 Pod,该接口遵循类似于 cache.FIFO
和 cache.Heap
这样的数据结构,要弄明白调度器是如何去调度 Pod 的,我们就首先需要弄清楚这个结构:
// pkg/scheduler/internal/queue/scheduling_queue.go
// 用于存储带调度 Pod 的队列接口
type SchedulingQueue interface {
framework.PodNominator
// AddUnschedulableIfNotPresent 将无法调度的 Pod 添加回调度队列
// podSchedulingCycle表示可以通过调用 SchedulingCycle() 返回的当前调度周期号
AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
// SchedulingCycle 返回由调度队列缓存的当前调度周期数。
// 通常,只要弹出一个 Pod(例如调用 Pop() 函数),就增加此数字。
SchedulingCycle() int64
// 下面是通用队列相关操作
// Pop 删除队列的头并返回它。
// 如果队列为空,它将阻塞,并等待直到新元素添加到队列中
Pop() (*framework.QueuedPodInfo, error)
// 往队列中添加一个 Pod
Add(pod *v1.Pod) error
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveOrBackoffQueue(event string)
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
PendingPods() []*v1.Pod
// 关闭 SchedulingQueue,以便等待 pop 元素的 goroutine 可以正常退出
Close()
// NumUnschedulablePods 返回 SchedulingQueue 中存在的不可调度 Pod 的数量
NumUnschedulablePods() int
// 启动管理队列的goroutine
Run()
}
SchedulingQueue 是一个用于存储带调度 Pod 的队列接口,在构造 Scheduler 对象的时候我们可以了解到调度器中是如何实现这个队列接口的:
// pkg/scheduler/factory.go
// Profiles are required to have equivalent queue sort plugins.
lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
podQueue := internalqueue.NewSchedulingQueue(
lessFn,
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
)
......
return &Scheduler{
......
NextPod: internalqueue.MakeNextPodFunc(podQueue),
......
SchedulingQueue: podQueue,
}, nil
可以看到上面的 internalqueue.NewSchedulingQueue
就是创建的一个 SchedulingQueue 对象,定义如下所示:
// pkg/scheduler/internal/queue/scheduling_queue.go
// 初始化一个优先级队列作为一个新的调度队列
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
return NewPriorityQueue(lessFn, opts...)
}
// 配置 PriorityQueue
type Option func(*priorityQueueOptions)
// 创建一个 PriorityQueue 对象
func NewPriorityQueue(
lessFn framework.LessFunc,
opts ...Option,
) *PriorityQueue {
......
comp := func(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.QueuedPodInfo)
pInfo2 := podInfo2.(*framework.QueuedPodInfo)
return lessFn(pInfo1, pInfo2)
}
......
pq := &PriorityQueue{
PodNominator: options.podNominator,
clock: options.clock,
stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration,
podMaxBackoffDuration: options.podMaxBackoffDuration,
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
moveRequestCycle: -1,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
return pq
}
从上面的初始化过程可以看出来 PriorityQueue 这个优先级队列实现了 SchedulingQueue 接口,所以真正的实现还需要去查看这个优先级队列:
// pkg/scheduler/internal/queue/scheduling_queue.go
// PriorityQueue 实现了调度队列 SchedulingQueue
// PriorityQueue 的头部元素是优先级最高的 pending Pod,该结构有三个子队列:
// 一个子队列包含正在考虑进行调度的 Pod,称为 activeQ,是一个堆
// 另一个队列包含已尝试并且确定为不可调度的 Pod,称为 unschedulableQ
// 第三个队列包含从 unschedulableQ 队列移出的 Pod,并在 backoff 完成后将其移到 activeQ 队列
type PriorityQueue struct {
framework.PodNominator
stop chan struct{}
clock util.Clock
// pod 初始 backoff 的时间
podInitialBackoffDuration time.Duration
// pod 最大 backoff 的时间
podMaxBackoffDuration time.Duration
lock sync.RWMutex
cond sync.Cond // condition
// activeQ 是调度程序主动查看以查找要调度 pod 的堆结构,堆头部是优先级最高的 Pod
activeQ *heap.Heap
// backoff 队列
podBackoffQ *heap.Heap
// unschedulableQ 不可调度队列
unschedulableQ *UnschedulablePodsMap
// 调度周期的递增序号,当 pop 的时候会增加
schedulingCycle int64
// moveRequestCycle 会缓存 schedulingCycle 的值
// 当未调度的 Pod 重新被添加到 activeQ 中会保存 schedulingCycle 到 moveRequestCycle 中
moveRequestCycle int64
// 表明队列已经被关闭
closed bool
}
这里使用的是一个 PriorityQueue 优先级队列来存储带调度的 Pod,这个也很好理解,普通队列是一个 FIFO 数据结构,根据元素进入队列的顺序依次出队,而对于调度的这个场景,优先级队列显然更合适,可以根据某些优先级策略,优先对某个 Pod 进行调度。
PriorityQueue 的头部元素是优先级最高的带调度的 Pod,该结构有三个子队列:
这里我们需要来弄清楚这几个队列是如何实现的。