1
1
# Kubernetes 事件队列
2
2
3
+ <!-- TOC -->
4
+
5
+ - [ Kubernetes 事件队列] ( #kubernetes-事件队列 )
6
+ - [ Queue 是添加了 Pop 方法的 Store] ( #queue-是添加了-pop-方法的-store )
7
+ - [ FIFO 实现了 Queue/Store 接口] ( #fifo-实现了-queuestore-接口 )
8
+ - [ ` Add() ` 方法] ( #add-方法 )
9
+ - [ ` Update() ` 方法] ( #update-方法 )
10
+ - [ ` Delete() ` 方法] ( #delete-方法 )
11
+ - [ ` Pop() ` 方法] ( #pop-方法 )
12
+ - [ ` Replace() ` 方法] ( #replace-方法 )
13
+ - [ HasSyncd() 方法] ( #hassyncd-方法 )
14
+ - [ ` Resync() ` 方法] ( #resync-方法 )
15
+ - [ DeltaFIFO 是可以记录对象操作事件的 FIFO] ( #deltafifo-是可以记录对象操作事件的-fifo )
16
+ - [ DeltaFIFO 的生产者和消费者] ( #deltafifo-的生产者和消费者 )
17
+ - [ 记录对象事件的 Delta、Deltas 和 DeletedFinalStateUnknown 类型] ( #记录对象事件的-deltadeltas-和-deletedfinalstateunknown-类型 )
18
+ - [ Add() 和 Update() 方法] ( #add-和-update-方法 )
19
+ - [ queueActionLocked() 方法] ( #queueactionlocked-方法 )
20
+ - [ Delete() 方法] ( #delete-方法 )
21
+ - [ Get/GetByKey/List/ListKeys() 方法] ( #getgetbykeylistlistkeys-方法 )
22
+ - [ Replace() 方法] ( #replace-方法 )
23
+ - [ Resync() 方法] ( #resync-方法 )
24
+ - [ Pop() 方法] ( #pop-方法 )
25
+ - [ HasSyncd() 方法] ( #hassyncd-方法-1 )
26
+ - [ DeltaFIFO 和 knownObjects 对象缓存的同步] ( #deltafifo-和-knownobjects-对象缓存的同步 )
27
+
28
+ <!-- /TOC -->
29
+
3
30
Queue 接口是在 ` Store ` 的基础上,添加了 ` Pop() ` 方法。
4
31
5
32
FIFO 和 DeltaFIFO 类型(非接口)实现了 ` Queue ` 接口。
@@ -320,7 +347,7 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
320
347
2 . 根据配置的 Resync 时间,** 周期调用** DeltaFIFO 的 ` Resync() ` 方法(见后文),将 knownObjects 中的对象更新到 DeltaFIFO 中;
321
348
3 . Watch etcd,这是阻塞式的,根据事件的类型分别调用 DeltaFIFO 的 Add/Update/Delete 方法,将对象更新到 DeltaFIFO;
322
349
323
- Watch etcd 会** 周期性的** 超时,这时 ` ListAndWatch() ` 出错返回,Reflector 会等待一段时间再执行它,从而实现** 周期的将 ` etcd ` 中的特定类型的全部对象 ** 同步到 ` DeltaFIFO ` 。
350
+ Watch etcd 会** 周期性的** 超时,这时 ` ListAndWatch() ` 出错返回,Reflector 会等待一段时间再执行它,从而实现** 周期的将 ` etcd ` 中特定类型的全部对象 ** 同步到 ` DeltaFIFO ` 。
324
351
325
352
** ` controller ` 是 ` DeltaFIFO ` 的消费者,它用 DeltaFIFO 弹出的对象更新 ` knownObjects ` 缓存,然后调用注册的 OnUpdate/OnAdd/OnDelete 回调函数** 。详情参考 [ Reflector] ( 3.reflector.md ) 和 [ controller 和 Informer] ( 4.controller-informer.md ) 文档。
326
353
@@ -419,7 +446,7 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
419
446
1 . 遍历 list 中的对象,为每个对象生成 ` Sync ` 事件;
420
447
2 . 遍历 f.knownObjects.ListKeys() 中的对象,对于不在传入的 list 中的对象,生成 ` Deleted ` 事件,对象类型为 ` DeletedFinalStateUnknown ` (非 Delta 类型);
421
448
422
- Reflector 的 ` ListAndWatch() ` 因 Watch 超时而周期调用 Replace() 方法,将 etcd 中的特定类型的所有对象同步到 DeltaFIFO 中。` controller ` 用 DeltaFIFO 弹出的对象更新 ` knownObjects ` 缓存,详情参考 [ Reflector] ( 3.reflector.md ) 和 [ controller 和 Informer] ( 4.controller-informer.md ) 文档。
449
+ Reflector 的 ` ListAndWatch() ` 因 Watch 超时而周期调用 Replace() 方法,从而周期地将 etcd 中特定类型的所有对象同步到 DeltaFIFO 中。` controller ` 用 DeltaFIFO 弹出的对象更新 ` knownObjects ` 缓存,详情参考 [ Reflector] ( 3.reflector.md ) 和 [ controller 和 Informer] ( 4.controller-informer.md ) 文档。
423
450
424
451
### Resync() 方法
425
452
@@ -432,17 +459,17 @@ Reflector 的 `ListAndWatch()` 因 Watch 超时而周期调用 Replace() 方法
432
459
433
460
Reflector 的 ` ListAndWatch() ` 方法周期执行 DeltaFIFO 的 Resync() 方法,目的就是** 为对象产生新的 Sync 事件** ,从而有机会再次调用注册的 ` OnUpdate() ` 处理函数。因此 Resync 时,如果对象已经在 f.items,则后续因有机会被弹出,所以不需要为它生成 Sync 事件。
434
461
435
- ** 只有 Replace() 和 Rsync() 方法会产生 Sync 事件** 。
462
+ ** 只有 Replace() 和 Rsync() 方法才产生 Sync 事件** 。
436
463
437
464
### Pop() 方法
438
465
439
466
Pop(process PopProcessFunc)
440
467
441
468
1 . 如果弹出队列 f.queue 为空,则阻塞等待;
442
- 2 . 每次弹出队列头部的对象对应的事件列表 (Deltas 类型),然后将该对象从事件列表缓存 (f.items)中删除;
443
- 3 . 调用配置的回调函数 PopProcessFunc, 传入事件列表 Deltas;
469
+ 2 . 每次弹出队列头部对象的事件列表 (Deltas 类型),然后将该对象的事件列表从缓存 (f.items)中删除;
470
+ 3 . 调用配置的回调函数 PopProcessFunc( 传入事件列表 Deltas) ;
444
471
445
- 如果函数 PopProcessFunc 执行失败,应该再调用 ` AddIfNotPresent() ` 方法将 Deltas 重新加回 DeltaFIFO,这样后续可以再次被弹出处理,防止丢事件。(controler 已实现该逻辑)
472
+ 如果函数 PopProcessFunc 执行失败,应该调用 ` AddIfNotPresent() ` 方法将 Deltas 重新加回 DeltaFIFO,这样后续可以再次被弹出处理,防止丢事件。(controler 已实现该逻辑)
446
473
447
474
### HasSyncd() 方法
448
475
0 commit comments