- Informer
Inforer 的主要使用场景是自定义 Controller,它需要从 apiserver List/Watch 特定类型的资源对象的所有事件,缓存到内存,供后续快速查找,根据事件类型,调用用户注册的回调函数。
[前面分析了] (3.listwatch-reflector-controller.md) NewInformer()、NewIndexInformer() 函数使用 controller 的 Reflector List/Watch 特定资源类型的对象,缓存到本地,并调用用户设置的 OnAdd/OnUpdate/OnDelete 回调函数(保存在 ResourceEventHandler 中)。
这两个函数返回的 Store 和 Index 都缓存了从 apiserver List/Watch 更新的资源类型对象,并保持与 etcd 同步。另外,可以使用 Index 创建 Lister,进而更方便的从本地缓存中 List 和 Get 符合条件的资源对象。
这两个函数只能注册一组 OnAdd/OnUpdate/OnDelete 回调函数,如果需要注册多组回调函数(例如 kube-controller-manager)但它们又共享一份 Indexer 缓存,则可以使用 SharedInformer 或 SharedIndexInformer。
SharedInformer 提供一个共享的对象缓存,并且可以将缓存中对象的变化事件(Add/Update/Deleted)分发给多个通过 AddEventHandler() 方法添加的 listeners。需要注意的是通知事件包含的对象与缓存中的对象可能不一致,但是缓存中的对象一定是最新的,例如:同一个对象先后发送了两个 Update 事件,这时缓存中时最新 Update 的结果,但是先处理 Update 事件的回调函数从事件中拿到的对象时旧的。如果现实 Update 再 Deleted,则 Update 的回调函数可能从缓存中找不到对象。
另外,SharedInformer 内部使用循环队列,先将对象从 DeltaFIFO Pop 出来,然后再调用回调函数。
SharedInformer 和 SharedIndexInformer 一般和 workqueue 同时使用,具体参考:8.customize-controller.md
在分析 SharedInformer 和 SharedIndexInformer 之前,先分析它使用的 processorListener 和 sharedProcessor 结构类型。
processorListener 封装了监听器处理函数 ResourceEventHandler 以及 RingGrowing 类型的循环队列。
它通过 addCh Channel 接收对象,保存到循环队列 pendingNotifications 中缓存(队列大),然后 pop 到 nextCh,最后 run() 方法获得该对象,调用监听器函数。
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
type processorListener struct {
// nextCh 保存从 pendingNotifications.ReadOne() 读取的对象;
nextCh chan interface{}
// addCh 用于接收 add() 方法发送的对象,pop 方法读取它后 endingNotifications.WriteOne(notificationToAdd) 该对象;
addCh chan interface{}
// 用户实际配置的回调函数
handler ResourceEventHandler
// 循环队列,默认缓存 1024 个对象,从而提供了事件缓冲的能力
pendingNotifications buffer.RingGrowing
// 创建 listner 时,用户指定的 Resync 周期
requestedResyncPeriod time.Duration
// 该 listener 实际使用的 Resync 周期,一般是所有 listner 周期的最小值,所以可能与 requestedResyncPeriod 不同
resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time.Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync.Mutex
}
add() 方法将通知对象写入到 p.addCh Channel 中,如果 pendingNotifications 未满(默认 1024)则可以直接写入,而不需要等待 ResourceEventHandler 处理完毕。
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
pop() 方法是 processorListener 的核心方法,它实现了:
- 从 p.addCh Channel 读取数据,存入循环队列 p.pendingNotifications;
- 从 循环队列 p.pendingNotifications 取数据,写入 p.nextCh,供后续 run() 方法读取;
run() 从 p.nextCh 读取对象,然后调用用户的处理函数,执行时间可能较长,而 pop() 方法通过 channel selector 机制以及循环队列,巧妙地实现了异步非阻塞的写入和读取对象。
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
// 先将对象写入 p.nextCh,然后从循环对内中读取一个对象,下一次执行另一个 case 时写入 p.nextCh
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
// 从 p.addCh 读取一个待加入的对象,然后看是否有待通知的对象,如果有则设置好通知返回,否则写入循环队列
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
run() 方法从 p.nextCh channel 中获取通知对象,然后根据对象的类型调用注册的监听器函数。
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
sharedProcessor 类型封装了多个 processorListener,用于表示多组用户注册的通知函数。
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
type sharedProcessor struct {
// 标记 processor 是否启动
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
addListener() 方法用于向 Processor 添加新的、封装了用户处理函数的 listener,如果 process 的 run() 方法已经在运行,则启动 listener。
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
遍历 listeners,调用他们的 add() 方法添加对象。如果 obj 的事件类型是 Sync,则 sync 为 true。
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
运行已经注册的 listeners。
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
根据已经注册的所有 listerns,判断所有需要 syncing 的 listeners。
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) shouldResync() bool {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.syncingListeners = []*processorListener{}
resyncNeeded := false
now := p.clock.Now()
for _, listener := range p.listeners {
// need to loop through all the listeners to see if they need to resync so we can prepare any
// listeners that are going to be resyncing.
if listener.shouldResync(now) {
resyncNeeded = true
p.syncingListeners = append(p.syncingListeners, listener)
listener.determineNextResync(now)
}
}
return resyncNeeded
}
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
listener.setResyncPeriod(resyncPeriod)
}
}
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
AddEventHandler(handler ResourceEventHandler)
// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
// specified resync period. Events to a single handler are delivered sequentially, but there is
// no coordination between different handlers.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// GetStore returns the Store.
GetStore() Store
// GetController gives back a synthetic interface that "votes" to start the informer
GetContrtarts the shared informer, which will be stopped when stopCh is closed.
Run(stopoller() Controller
// Run sCh <-chan struct{})
// HasSynced returns true if the shared informer's store has synced.
HasSynced() bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion() string
}
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
函数 NewSharedInformer() 和 NewSharedIndexInformer() 分别返回实现这两个接口的对象 sharedIndexInformer 实例:
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
// NewSharedInformer creates a new instance for the listwatcher.
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
}
// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
// indexer 即为对象缓存
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
}
return sharedIndexInformer
}
- 传给 NewSharedIndexInformer () 的 indexers 一般是
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
,然后用DeletionHandlingMetaNamespaceKeyFunc
作为对象的 KeyFunc 创建 Indexer 缓存;
后文会介绍,一般情况下,我们不需要使用 NewSharedInformer() 和 NewSharedIndexInformer() 函数为特定资源类型创建 SharedInformer,而是使用 codegen 为特定资源类型创建的 NewXXXInformer() 和 NewFilteredXXXInformer() 函数来创建。
这两个 Informer 包含:
- 带索引的对象缓存 Indexer;
- 内置 Controller,它根据 ListerWatcher、resyncCheckPeriod、objectType 等参数创建 Relfector;
- Controller 的 Process 函数为 Infomer 的 HandleDeltas() 方法,后者负责向多组 listener 发送事件;
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
// indexer 为对象缓存,一般是 NewIndexer() 函数创建的,如:NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
indexer Indexer
// 从 apiserver List/Watch 对象的 Controller
controller Controller
// 可以注册多组监听器的共享 processor
processor *sharedProcessor
cacheMutationDetector CacheMutationDetector
// 创建 controller 使用的 ListerWatcher 和对象类型
listerWatcher ListerWatcher
objectType runtime.Object
// 指定 Reflector 的将对象从 indexer 同步到 DeltaFIFO 的周期
// 一般是所有注册的监听函数指定的**最小值**:每次注册监听函数组时,都会比较传入的 period 是否比这个值小,如果小,则使用传入的值 period 设置该值;
resyncCheckPeriod time.Duration
// 缺省的 ResyncPeriod。使用 AddEventHandler() 方法添加 listener 时,使用该 ResyncPeriod
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
}
sharedIndexInformer 也是基于 Controller 实现,Run() 方法:
- 创建对象历史事件缓存 DeltaFIFO;
- 创建 Controller,它的 Process 函数是支持向多个 listener 派发对象事件的 s.HandleDeltas() 方法;
- 启动 processer,用于处理分发对象事件;
- 启动 Controller,进而启动 Reflector 和 Pop 处理过程;
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 传入对象缓存 s.indexer(参考 NewSharedIndexInformer() 函数,一般是 NewIndexer() 函数创建的),创建 DeltaFIFO
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
// 使用 DeltaFIFO 创建 Controller 的配置
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
// Controller 的处理函数是 s.HandleDeltas() 方法
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 创建 Controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 启动 processer,用于处理分发对象事件
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 运行 Controller
s.controller.Run(stopCh)
}
该方法在写自定义 Controller 时经常用到,用于判断 Informer Cache 是否完成同步,然后再启动 workqueue 的 worker,具体参考:3.listwatch-reflector-controller.md
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HasSynced() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.controller == nil {
return false
}
return s.controller.HasSynced()
}
该方法用于注册监听函数,使用默认的同步周期,可以调用多次,从而注册多组监听函数:
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
// 参考后文对 AddEventHandlerWithResyncPeriod 的分析
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
结构体类型 ResourceEventHandlerFuncs
和 FilteringResourceEventHandler
实现了 ResourceEventHandler
接口:
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
type FilteringResourceEventHandler struct {
FilterFunc func(obj interface{}) bool
Handler ResourceEventHandler
}
FilteringResourceEventHandler 在调用 Handler 的方法前,会先用 FilterFunc 对对象进行过滤,通过过滤的对象才调用 Handler 处理。
实际例子:
// 来源于:https://github.com/kubernetes/sample-controller/blob/master/controller.go
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Deployment will always have different RVs.
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
对于 DeleteFunc() 函数而言,传入的 obj 可能是 K8S 资源对象,也可能说 DeletedFinalStateUnknown 类型对象,所以它需要区分:
// 来源于:https://github.com/kubernetes/sample-controller/blob/master/controller.go
// handleObject 是 DeleteFunc 指定的函数
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
// 对接口类型的 obj 类型进行断言,先是 K8S 资源类型
if object, ok = obj.(metav1.Object); !ok {
// 如果失败,则可能是 DeletedFinalStateUnknown 类型对象
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return
}
klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
}
...
}
该方法实际负责注册监听函数,resyncPeriod 参数用于指定需求的 DeltaFIFO 同步周期。
该放的主要工作如下:
- 比较传本次传入的 resyncPeriod 与当前全局同步周期(s.resyncCheckPeriod),使用较小的值作为全局同步周期。
- 创建监听器;
- 注册监听器;
如果在 sharedIndexInformer 已经 Run 的情况下注册监听器,则该访问将遍历对象缓存,为所有对象生成 Add 事件,调用传入的事件处理函数。 这样可以保证即使后注册监听器,也不会丢失对象的历史事件。
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
// DeltaFIFO 的最小同步周期是 1s
const minimumResyncPeriod = 1 * time.Second
const initialBufferSize = 1024
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.stopped {
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
return
}
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s.resyncCheckPeriod {
if s.started {
klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}
// 创建监听器
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
// informer 未启动时,直接注册监听器即可
if !s.started {
// 注册监听器
s.processor.addListener(listener)
return
}
// 如果 informer 已经启动,为了安全地 Join:
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
// 获取发送事件的锁(因为后续要遍历对象缓存,加锁的话,可以防止其它 goroutine 更新对象缓存)
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// 注册监听器
s.processor.addListener(listener)
// 遍历对象缓存中的各对象,为它们生成 Add 事件,然后发送给新的 Handler;
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
sharedIndexInformer 内置的 Controller 从 DeltaFIFO Pop 出对象(Deltas 类型)时调用该方法:
- 遍历 Deltas 中的 Delta;
- 根据 Delta 的事件类型,更新缓存,分发对应的事件,从而执行各监听器函数;
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
// Controller 传入的 obj 实际是从 Reflector.Pop() 方法返回的 Deltas,它是 Delta 的列表
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
// 查找对象缓存,看是否存在该对象
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 如果存在则更新缓存,
if err := s.indexer.Update(d.Object); err != nil {
return err
}
// 分发 update 事件
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 缓存中没有该对象,则添加到缓存
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 分发 add 事件
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
该方法用于等待 stopCh 关闭(返回的 err 不为 nil),或者 cacheSyncs 列表的所有 InformerSynced 类型的函数都返回 true(返回的 err 为 nil):
// 来源于:k8s.io/client-go/tools/cache/shared_informer.go
syncedPollPeriod = 100 * time.Millisecond
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
// wait.PollUntil 会每隔 100ms 执行匿名函数,直到它返回 true,或者 stopCh 被关闭
err := wait.PollUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
klog.V(2).Infof("stop requested")
return false
}
klog.V(4).Infof("caches populated")
return true
}
- 返回 true 时表示 Informer Cache 都 Synced;
- 返回 false 时表示 stopCh 被管理;
实例例子:
// 来源于:https://github.com/kubernetes/sample-controller/blob/master/controller.go
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
...
// c.deploymentsSynced = deploymentInformer.Informer().HasSynced
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
...
}
一般情况下,我们不需要使用 NewSharedInformer() 和 NewSharedIndexInformer() 函数为特定资源类型创建 SharedInformer,而是使用 codegen 为特定资源类型创建的 NewXXXInformer() 和 NewFilteredXXXInformer() 函数来创建。
// 来源于:k8s.io/client-go/informers/apps/v1/deployment.go
func NewDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredDeploymentInformer(client, namespace, resyncPeriod, indexers, nil)
}
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1().Deployments(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1().Deployments(namespace).Watch(options)
},
},
&appsv1.Deployment{},
resyncPeriod,
indexers,
)
}
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}
func (f *deploymentInformer) Lister() v1.DeploymentLister {
return v1.NewDeploymentLister(f.Informer().GetIndexer())
}
-
client 一般是 client-go 的 kubernets 或 CRD 的 clientset,如:
// 来源于:https://github.com/kubernetes/sample-controller/blob/master/main.go cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) if err != nil { klog.Fatalf("Error building kubeconfig: %s", err.Error()) } kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { klog.Fatalf("Error building kubernetes clientset: %s", err.Error()) } exampleClient, err := clientset.NewForConfig(cfg) if err != nil { klog.Fatalf("Error building example clientset: %s", err.Error()) }
-
传给 NewFilteredDeploymentInformer() 函数的 indexers 一般是
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
。
一般情况下,我们不直接使用上面的 NewXXX 函数创建各资源类型的 SharedInformer,而是使用 codegen 生成的 sharedInformerFactory 来创建它们,具体参考:6.sharedInformerFactory.md