Skip to content

Commit 0d35150

Browse files
author
Zhang Jun
committed
update
1 parent a321095 commit 0d35150

File tree

4 files changed

+1031
-25
lines changed

4 files changed

+1031
-25
lines changed

client-go/2.queue-fifo-delta_fifo.md

+25-25
Original file line numberDiff line numberDiff line change
@@ -172,15 +172,15 @@ func (f *FIFO) Delete(obj interface{}) error {
172172
}
173173
```
174174

175-
注意:**没有**从弹出队列(f.queue) 中移除对象,后续弹出该对象时会**忽略**这种情况,继续弹出下一个对象。
175+
注意:**没有**从弹出队列(f.queue) 中删除该对象 Key,后续还会弹出这个已删除对象的 Key,但由于缓存中没有该对象,故会**忽略**该对象 Key,继续弹出下一个对象。
176176

177177
### Pop() 方法
178178

179-
从弹出队列(f.queue)弹出一个对象,调用用户注册的回调函数进行处理,返回处理后的对象和出错信息如果队列为空,则一直阻塞。
179+
从弹出队列(f.queue)弹出一个对象,调用用户注册的回调函数进行处理,返回处理后的对象和出错信息如果队列为空,则一直阻塞。
180180

181181
处理函数执行失败时应该返回 `ErrRequeue` 类型的错误,这时该对象会被**重新加回** FIFO,后续可以再次被弹出处理。
182182

183-
`Pop()` 是在对 FIFO 加锁的情况下调用处理函数的,所以可以在多个 goroutine 中**并发调用** 调用该方法
183+
`Pop()` 是在对 FIFO 加锁的情况下调用处理函数的,所以可以在多个 goroutine 中**并发调用** 该方法
184184

185185
``` go
186186
// 来源于 k8s.io/client-go/tools/cache/fifo.go
@@ -224,7 +224,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
224224

225225
### Replace() 方法
226226

227-
用传入的一组对象替换对象缓存 f.items 和弹出队列 f.queue(未弹出的对象被忽略)
227+
用传入的一组对象替换对象缓存 f.items 和弹出队列 f.queue:
228228

229229
``` go
230230
// 来源于 k8s.io/client-go/tools/cache/fifo.go
@@ -527,25 +527,25 @@ func (f *DeltaFIFO) HasSynced() bool {
527527

528528
后续文章会介绍,`controller` 一般是在 `Informer` 中使用的,`controller` 调用的 `OnUpdate()` 函数会调用用户创建 `Informer` 时传入的 `ResourceEventHandler` 中的 `OnUpdate()` 函数。所以用户的 `OnUpdate()` 函数可能会因 DeltaFIFO 的周期 Resync() 而被调用,它应该检查传入的 old、new 对象是否发生变化,未变化时直接返回:
529529

530-
``` go
531-
// 来源于 https://github.com/kubernetes/sample-controller/blob/master/controller.go#L131
532-
deployInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
533-
// 第一次从 etcd 同步的对象会生成 Added 事件,调用该函数
534-
// 后续 Added 事件就代表的确有**新创建**的资源对象;
535-
AddFunc: controller.handleDeploy,
536-
UpdateFunc: func(old, new interface{}) {
537-
// 由于 Reflector 周期调用 DeltaFIFO 的 Rsync() 方法,`controller` 会调用注册的 OnUpdate 回调函数,所以需要判断新旧对象是否相同,如果相同则不需处理
538-
// 也可以用
539-
newDepl := new.(*extensionsv1beta1.Deployment)
540-
oldDepl := old.(*extensionsv1beta1.Deployment)
541-
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
542-
// Periodic resync will send update events for all known Deployments.
543-
// Two different versions of the same Deployment will always have different RVs.
544-
return
545-
}
546-
controller.handleDeploy(new)
547-
},
548-
DeleteFunc: controller.handleDeploy,
549-
})
550-
```
530+
``` go
531+
// 来源于 https://github.com/kubernetes/sample-controller/blob/master/controller.go#L131
532+
deployInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
533+
// 第一次从 etcd 同步的对象会生成 Added 事件,调用该函数
534+
// 后续 Added 事件就代表的确有**新创建**的资源对象;
535+
AddFunc: controller.handleDeploy,
536+
UpdateFunc: func(old, new interface{}) {
537+
// 由于 Reflector 周期调用 DeltaFIFO 的 Rsync() 方法,`controller` 会调用注册的 OnUpdate 回调函数,所以需要判断新旧对象是否相同,如果相同则不需处理
538+
// 也可以用
539+
newDepl := new.(*extensionsv1beta1.Deployment)
540+
oldDepl := old.(*extensionsv1beta1.Deployment)
541+
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
542+
// Periodic resync will send update events for all known Deployments.
543+
// Two different versions of the same Deployment will always have different RVs.
544+
return
545+
}
546+
controller.handleDeploy(new)
547+
},
548+
DeleteFunc: controller.handleDeploy,
549+
})
550+
```
551551
另外,前面的第 2 条提到过,`controller` 刚启动时,knownObjects 为空,会为从 etcd 同步来的特定类型的所有对象生成 Added 事件,进而调用上面注册的 AddFunc

client-go/3.listwatch-reflector.md

+248
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
# kubernetes 事件反射器
2+
3+
Reflector 是 Kubernetes 的事件反射器,它 List 和 Watch etcd 中的资源对象变化,将其更新到内部的 DeltaFIFO 中。
4+
5+
在介绍 Relfector 前,先介绍 Reflector 使用的 ListerWatcher 接口;
6+
7+
func NewIndexerInformer(
8+
lw ListerWatcher,
9+
objType runtime.Object,
10+
resyncPeriod time.Duration,
11+
h ResourceEventHandler,
12+
indexers Indexers,
13+
) (Indexer, Controller)
14+
15+
-> 创建 controller
16+
// Config contains all the settings for a Controller.
17+
type Config struct {
18+
...
19+
// Something that can list and watch your objects.
20+
ListerWatcher
21+
...
22+
}
23+
24+
-> 创建 Reflector(c.config.ListerWatcher 是传给 NewIndexerInformer()的 lw):
25+
func (c *controller) Run(stopCh <-chan struct{}) {
26+
...
27+
r := NewReflector(
28+
c.config.ListerWatcher,
29+
c.config.ObjectType,
30+
c.config.Queue,
31+
c.config.FullResyncPeriod,
32+
)
33+
...
34+
c.reflector = r
35+
...
36+
}
37+
38+
-> ListAndWatch
39+
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
40+
options := metav1.ListOptions{ResourceVersion: "0"}
41+
list, err := r.listerWatcher.List(options)
42+
...
43+
for {
44+
options = metav1.ListOptions{
45+
ResourceVersion: resourceVersion,
46+
TimeoutSeconds: &timeoutSeconds,
47+
}
48+
...
49+
w, err := r.listerWatcher.Watch(options)
50+
...
51+
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
52+
...
53+
}
54+
}
55+
}
56+
57+
## ListAndWatch
58+
59+
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
60+
type ListerWatcher interface {
61+
// List should return a list type object; the Items field will be extracted, and the
62+
// ResourceVersion field will be used to start the watch in the right place.
63+
List(options metav1.ListOptions) (runtime.Object, error)
64+
// Watch should begin a watch at the specified version.
65+
Watch(options metav1.ListOptions) (watch.Interface, error)
66+
}
67+
68+
// ListFunc knows how to list resources
69+
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
70+
71+
// WatchFunc knows how to watch resources
72+
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
73+
74+
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
75+
// It is a convenience function for users of NewReflector, etc.
76+
// ListFunc and WatchFunc must not be nil
77+
type ListWatch struct {
78+
ListFunc ListFunc
79+
WatchFunc WatchFunc
80+
// DisableChunking requests no chunking for this list watcher.
81+
DisableChunking bool
82+
}
83+
84+
// Getter interface knows how to access Get method from RESTClient.
85+
type Getter interface {
86+
Get() *restclient.Request
87+
}
88+
89+
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
90+
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
91+
optionsModifier := func(options *metav1.ListOptions) {
92+
options.FieldSelector = fieldSelector.String()
93+
}
94+
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
95+
}
96+
97+
// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
98+
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
99+
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
100+
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
101+
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
102+
optionsModifier(&options)
103+
return c.Get().
104+
Namespace(namespace).
105+
Resource(resource).
106+
VersionedParams(&options, metav1.ParameterCodec).
107+
Do().
108+
Get()
109+
}
110+
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
111+
options.Watch = true
112+
optionsModifier(&options)
113+
return c.Get().
114+
Namespace(namespace).
115+
Resource(resource).
116+
VersionedParams(&options, metav1.ParameterCodec).
117+
Watch()
118+
}
119+
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
120+
}
121+
122+
// List a set of apiserver resources
123+
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
124+
if !lw.DisableChunking {
125+
return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
126+
}
127+
return lw.ListFunc(options)
128+
}
129+
130+
// Watch a set of apiserver resources
131+
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
132+
return lw.WatchFunc(options)
133+
}
134+
135+
##
136+
137+
``` go
138+
func (r *Reflector) Run(stopCh <-chan struct{}) {
139+
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
140+
wait.Until(func() {
141+
if err := r.ListAndWatch(stopCh); err != nil {
142+
utilruntime.HandleError(err)
143+
}
144+
}, r.period, stopCh)
145+
}
146+
```
147+
148+
``` go
149+
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
150+
...
151+
resourceVersion = listMetaInterface.GetResourceVersion()
152+
items, err := meta.ExtractList(list)
153+
...
154+
if err := r.syncWith(items, resourceVersion); err != nil {
155+
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
156+
}
157+
...
158+
159+
go func() {
160+
resyncCh, cleanup := r.resyncChan()
161+
defer func() {
162+
cleanup() // Call the last one written into cleanup
163+
}()
164+
for {
165+
select {
166+
case <-resyncCh:
167+
case <-stopCh:
168+
return
169+
case <-cancelCh:
170+
return
171+
}
172+
if r.ShouldResync == nil || r.ShouldResync() {
173+
klog.V(4).Infof("%s: forcing resync", r.name)
174+
if err := r.store.Resync(); err != nil {
175+
resyncerrc <- err
176+
return
177+
}
178+
}
179+
cleanup()
180+
resyncCh, cleanup = r.resyncChan()
181+
}
182+
}()
183+
184+
for {
185+
...
186+
// Watch 会在 timeoutSeconds 后超时,这时 r.watchHandler() 出错,ListAndWatch() 方法出错返回
187+
// 这时 Reflecter 会等待 r.period 事件后重新执行 ListAndWatch() 方法;
188+
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
189+
options = metav1.ListOptions{
190+
ResourceVersion: resourceVersion,
191+
TimeoutSeconds: &timeoutSeconds,
192+
}
193+
w, err := r.listerWatcher.Watch(options)
194+
...
195+
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
196+
if err != errorStopRequested {
197+
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
198+
}
199+
return nil
200+
}
201+
}
202+
}
203+
204+
// syncWith replaces the store's items with the given list.
205+
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
206+
found := make([]interface{}, 0, len(items))
207+
for _, item := range items {
208+
found = append(found, item)
209+
}
210+
return r.store.Replace(found, resourceVersion)
211+
}
212+
213+
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
214+
...
215+
loop:
216+
for {
217+
select {
218+
...
219+
case event, ok := <-w.ResultChan():
220+
...
221+
newResourceVersion := meta.GetResourceVersion()
222+
switch event.Type {
223+
case watch.Added:
224+
err := r.store.Add(event.Object)
225+
if err != nil {
226+
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
227+
}
228+
case watch.Modified:
229+
err := r.store.Update(event.Object)
230+
if err != nil {
231+
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
232+
}
233+
case watch.Deleted:
234+
err := r.store.Delete(event.Object)
235+
if err != nil {
236+
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
237+
}
238+
default:
239+
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
240+
}
241+
*resourceVersion = newResourceVersion
242+
r.setLastSyncResourceVersion(newResourceVersion)
243+
eventCount++
244+
}
245+
}
246+
...
247+
}
248+
```

0 commit comments

Comments
 (0)