<aside> 🤡 本文主要对 SharedInformer 组件进行分析说明。
</aside>
上节课我们分析了 Indexer 组件的实现,实际上最开始的时候我们在 Informer 示例中通过 Informer 的 Lister 获取的资源对象数据就来自于 Indexer,当然除了 Lister 之外最重要的就是资源对象事件监听的操作,这些都是在 SharedInformer 中去实现的,所以我们需要去分析下 SharedInformer 的实现,这样就可以完整的将前面的内容串联起来了。
我们平时说的 Informer 其实就是 SharedInformer,它是可以共享使用的。如果同一个资源的 Informer 被实例化多次,那么就会运行多个 ListAndWatch 操作,这会加大 APIServer 的压力。而 SharedInformer 通过一个 map 来让同一类资源的 Informer 实现共享一个 Refelctor,这样就不会出现上面这个问题了。接下来我们先来查看 SharedInformer 的具体实现:
// k8s.io/client-go/tools/cache/shared_informer.go
type SharedInformer interface {
// 添加资源事件处理器,当有资源变化时就会通过回调通知使用者
AddEventHandler(handler ResourceEventHandler)
// 需要周期同步的资源事件处理器
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// 获取一个 Store 对象,前面我们讲解了很多实现 Store 的结构
GetStore() Store
// 获取一个 Controller,下面会详细介绍,主要是用来将 Reflector 和 DeltaFIFO 组合到一起工作
GetController() Controller
// SharedInformer 的核心实现,启动并运行这个 SharedInformer
// 当 stopCh 关闭时候,informer 才会退出
Run(stopCh <-chan struct{})
// 告诉使用者全量的对象是否已经同步到了本地存储中
HasSynced() bool
// 最新同步资源的版本
LastSyncResourceVersion() string
}
// 在 SharedInformer 基础上扩展了添加和获取 Indexers 的能力
type SharedIndexInformer interface {
SharedInformer
// 在启动之前添加 indexers 到 informer 中
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
如果我们要处理资源的事件的话,就需要添加一个事件处理器,传入一个 ResourceEventHandler
接口,其定义如下所示:
// k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandler interface {
// 添加对象回调函数
OnAdd(obj interface{})
// 更新对象回调函数
OnUpdate(oldObj, newObj interface{})
// 删除对象回调函数
OnDelete(obj interface{})
}
然后接下来我们来看看 SharedIndexInformer
的具体实现类的定义:
// k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
// Indexer也是一种Store,这个我们知道的,Controller负责把Reflector和FIFO逻辑串联起来
// 所以这两个变量就涵盖了开篇那张图里面的Reflector、DeltaFIFO和LocalStore(cache)
indexer Indexer
// 在 Controller 中将 Reflector 和 DeltaFIFO 关联了起来
controller Controller
// 对 ResourceEventHandler 进行了一层层封装,统一由 sharedProcessor 管理
processor *sharedProcessor
// 监控对象在一个时间窗口内是否发生了变化
cacheMutationDetector MutationDetector
// 用于 Reflector 中真正执行 ListAndWatch 的操作
listerWatcher ListerWatcher
// informer 中要处理的对象
objectType runtime.Object
// 定期同步周期
resyncCheckPeriod time.Duration
// 任何通过 AddEventHandler 添加的处理程序的默认重新同步的周期
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
// 启动、停止标记
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex
}
上面我们看到在 sharedIndexInformer 中定义了一个 Controller,这里的 Controller 并不是我们比较熟悉的 kube-controller-manager 管理的各种控制器,这里的 Controller 定义在 client-go/tools/cache/controller.go
中,目的是用来把 Reflector、DeltaFIFO 这些组件组合起来形成一个相对固定的、标准的处理流程。我们先来看下 Controller 的定义:
// k8s.io/client-go/tools/cache/controller.go
// Controller 的抽象接口
type Controller interface {
// Run 函数主要做两件事,一件是构造并运行一个 Reflector 反射器,将对象/通知从 Config 的
// ListerWatcher 送到 Config 的 Queue 队列,并在该队列上调用 Resync 操作
// 另外一件事就是不断从队列中弹出对象,并使用 Config 的 ProcessFunc 进行处理
Run(stopCh <-chan struct{})
HasSynced() bool // APIServer 中的资源对象是否同步到了 Store 中
LastSyncResourceVersion() string // 最新的资源版本号
}
因为 Controller 把多个模块整合起来实现了一套业务逻辑,所以在创建Controller 的时候需要提供一些配置:
// k8s.io/client-go/tools/cache/controller.go
type Config struct {
Queue // 资源对象的队列,其实就是一个 DeltaFIFO
ListerWatcher // 用来构造 Reflector 的
Process ProcessFunc // DeltaFIFO 队列 Pop 的时候回调函数,用于处理弹出的对象
ObjectType runtime.Object // 对象类型,也就是 Reflector 中使用的
FullResyncPeriod time.Duration // 全量同步周期,在 Reflector 中使用
ShouldResync ShouldResyncFunc // Reflector 中是否需要 Resync 操作
RetryOnError bool // 出现错误是否需要重试
}
Controller 自己构造 Reflector 获取对象,Reflector 作为 DeltaFIFO 生产者持续监控 APIServer 的资源变化并推送到队列中。Controller 的 Run() 就是是队列的消费者,从队列中弹出对象并调用 Process() 进行处理。接下来我们来看 Controller 的一个具体实现 controller:
// k8s.io/client-go/tools/cache/controller.go
// controller是 Controller 的一个具体实现
type controller struct {
config Config // 配置
reflector *Reflector // 反射器
reflectorMutex sync.RWMutex // 反射器的锁
clock clock.Clock // 时钟
}
// 控制器核心实现
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 新建一个协程,如果收到系统退出的信号就关闭队列
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 实例化一个 Reflector,传入的参数都是从 Config 中获取的
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
// 将反射器给到controller
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
// 等待所有协程执行完毕
var wg wait.Group
defer wg.Wait()
// StartWithChannel 会启动协程执行 Reflector.Run(),接收到 stopCh 信号才会退出协程
wg.StartWithChannel(stopCh, r.Run)
// wait.Unitl() 就是周期性的调用 c.processLoop() 操作处理弹出的对象
wait.Until(c.processLoop, time.Second, stopCh)
}
从上面的核心函数 Run 的实现方式来看,该函数中主要就是实例化一个 Reflector,然后启动一个协程去执行这个反射器的 Run 函数,这个 Run 函数前面我们已经讲解过就是去调用 ListAndWatch
函数进行 List 和 Watch 操作,这个操作中具体的实现就是 Config 中的 ListerWatcher
。然后的一个核心就是 processLoop() 函数的实现:
// k8s.io/client-go/tools/cache/controller.go
// 处理队列弹出的对象
func (c *controller) processLoop() {
// 死循环,不断从队列中弹出对象来处理
for {
// 从队列中弹出一个对象,然后处理这个对象
// 真正处理的是通过 Config 传递进来的 Process 函数
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
// 如果队列关闭了那就直接退出了
if err == ErrFIFOClosed {
return
}
// 如果配置的是错误后允许重试
if c.config.RetryOnError {
// 如果错误可以再重试那么将弹出的对象重新入队列进行处理
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}