<aside> ⏰ 本文主要介绍调度器调度一个 Pod 的流程就行源码分析。

</aside>

前面我们分析了 kube-scheduler 组件如何接收命令行参数,用传递的参数构造一个 Scheduler 对象,最终启动了调度器。调度器启动后就可以开始为未调度的 Pod 进行调度操作了,本文主要来分析调度器是如何对一个 Pod 进行调度操作的。

调度队列

调度器启动后最终是调用 Scheduler 下面的 Run() 函数来开始调度 Pod,如下所示代码:

// pkg/scheduler/scheduler.go

// 等待 cache 同步完成,然后开始调度
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

首先会等待所有的 cache 同步完成,然后开始执行 SchedulingQueue 的 Run() 函数,SchedulingQueue 是一个队列接口,用于存储待调度的 Pod,该接口遵循类似于 cache.FIFOcache.Heap 这样的数据结构,要弄明白调度器是如何去调度 Pod 的,我们就首先需要弄清楚这个结构:

// pkg/scheduler/internal/queue/scheduling_queue.go

// 用于存储带调度 Pod 的队列接口
type SchedulingQueue interface {
	framework.PodNominator
	// AddUnschedulableIfNotPresent 将无法调度的 Pod 添加回调度队列
  // podSchedulingCycle表示可以通过调用 SchedulingCycle() 返回的当前调度周期号
	AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
  // SchedulingCycle 返回由调度队列缓存的当前调度周期数。 
  // 通常,只要弹出一个 Pod(例如调用 Pop() 函数),就增加此数字。
	SchedulingCycle() int64
  
  // 下面是通用队列相关操作
  // Pop 删除队列的头并返回它。 
  // 如果队列为空,它将阻塞,并等待直到新元素添加到队列中
	Pop() (*framework.QueuedPodInfo, error)
  // 往队列中添加一个 Pod
	Add(pod *v1.Pod) error
	Update(oldPod, newPod *v1.Pod) error
	Delete(pod *v1.Pod) error

	MoveAllToActiveOrBackoffQueue(event string)
	AssignedPodAdded(pod *v1.Pod)
	AssignedPodUpdated(pod *v1.Pod)
	PendingPods() []*v1.Pod
  // 关闭 SchedulingQueue,以便等待 pop 元素的 goroutine 可以正常退出
	Close()
	// NumUnschedulablePods 返回 SchedulingQueue 中存在的不可调度 Pod 的数量
	NumUnschedulablePods() int
	// 启动管理队列的goroutine
	Run()
}

SchedulingQueue 是一个用于存储带调度 Pod 的队列接口,在构造 Scheduler 对象的时候我们可以了解到调度器中是如何实现这个队列接口的:

// pkg/scheduler/factory.go

// Profiles are required to have equivalent queue sort plugins.
lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
podQueue := internalqueue.NewSchedulingQueue(
	lessFn,
	internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
	internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
	internalqueue.WithPodNominator(nominator),
)
......
return &Scheduler{
	......
	NextPod:         internalqueue.MakeNextPodFunc(podQueue),
  ......
	SchedulingQueue: podQueue,
}, nil

可以看到上面的 internalqueue.NewSchedulingQueue 就是创建的一个 SchedulingQueue 对象,定义如下所示:

// pkg/scheduler/internal/queue/scheduling_queue.go

// 初始化一个优先级队列作为一个新的调度队列
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
	return NewPriorityQueue(lessFn, opts...)
}

// 配置 PriorityQueue
type Option func(*priorityQueueOptions)

// 创建一个 PriorityQueue 对象
func NewPriorityQueue(
	lessFn framework.LessFunc,
	opts ...Option,
) *PriorityQueue {
  ......

  comp := func(podInfo1, podInfo2 interface{}) bool {
		pInfo1 := podInfo1.(*framework.QueuedPodInfo)
		pInfo2 := podInfo2.(*framework.QueuedPodInfo)
		return lessFn(pInfo1, pInfo2)
	}
  ......

  pq := &PriorityQueue{
		PodNominator:              options.podNominator,
		clock:                     options.clock,
		stop:                      make(chan struct{}),
		podInitialBackoffDuration: options.podInitialBackoffDuration,
		podMaxBackoffDuration:     options.podMaxBackoffDuration,
		activeQ:                   heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
		unschedulableQ:            newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
		moveRequestCycle:          -1,
	}
	pq.cond.L = &pq.lock
	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())

	return pq
}

从上面的初始化过程可以看出来 PriorityQueue 这个优先级队列实现了 SchedulingQueue 接口,所以真正的实现还需要去查看这个优先级队列:

// pkg/scheduler/internal/queue/scheduling_queue.go

// PriorityQueue 实现了调度队列 SchedulingQueue
// PriorityQueue 的头部元素是优先级最高的 pending Pod,该结构有三个子队列:
// 一个子队列包含正在考虑进行调度的 Pod,称为 activeQ,是一个堆
// 另一个队列包含已尝试并且确定为不可调度的 Pod,称为 unschedulableQ
// 第三个队列包含从 unschedulableQ 队列移出的 Pod,并在 backoff 完成后将其移到 activeQ 队列
type PriorityQueue struct {
	framework.PodNominator

	stop  chan struct{}
	clock util.Clock

	// pod 初始 backoff 的时间
	podInitialBackoffDuration time.Duration
	// pod 最大 backoff 的时间
	podMaxBackoffDuration time.Duration

	lock sync.RWMutex
	cond sync.Cond  // condition

  // activeQ 是调度程序主动查看以查找要调度 pod 的堆结构,堆头部是优先级最高的 Pod
	activeQ *heap.Heap
  // backoff 队列
	podBackoffQ *heap.Heap
	// unschedulableQ 不可调度队列
	unschedulableQ *UnschedulablePodsMap
  // 调度周期的递增序号,当 pop 的时候会增加
	schedulingCycle int64
  // moveRequestCycle 会缓存 schedulingCycle 的值
  // 当未调度的 Pod 重新被添加到 activeQ 中会保存 schedulingCycle 到 moveRequestCycle 中
	moveRequestCycle int64

	// 表明队列已经被关闭
	closed bool
}

这里使用的是一个 PriorityQueue 优先级队列来存储带调度的 Pod,这个也很好理解,普通队列是一个 FIFO 数据结构,根据元素进入队列的顺序依次出队,而对于调度的这个场景,优先级队列显然更合适,可以根据某些优先级策略,优先对某个 Pod 进行调度。

PriorityQueue 的头部元素是优先级最高的带调度的 Pod,该结构有三个子队列:

这里我们需要来弄清楚这几个队列是如何实现的。