2
2
3
3
Reflector 是 Kubernetes 的事件反射器,它 List 和 Watch etcd 中的资源对象变化,将其更新到内部的 DeltaFIFO 中。
4
4
5
- 在介绍 Relfector 前,先介绍 Reflector 使用的 ListerWatcher 接口;
6
-
7
- func NewIndexerInformer(
8
- lw ListerWatcher,
9
- objType runtime.Object,
10
- resyncPeriod time.Duration,
11
- h ResourceEventHandler,
12
- indexers Indexers,
13
- ) (Indexer, Controller)
14
-
15
- -> 创建 controller
16
- // Config contains all the settings for a Controller.
17
- type Config struct {
18
- ...
19
- // Something that can list and watch your objects.
20
- ListerWatcher
21
- ...
22
- }
23
-
24
- -> 创建 Reflector(c.config.ListerWatcher 是传给 NewIndexerInformer()的 lw):
25
- func (c * controller) Run(stopCh <-chan struct{}) {
26
- ...
27
- r := NewReflector(
28
- c.config.ListerWatcher,
29
- c.config.ObjectType,
30
- c.config.Queue,
31
- c.config.FullResyncPeriod,
32
- )
33
- ...
34
- c.reflector = r
35
- ...
36
- }
5
+ 在介绍 Relfector 前,先介绍 Reflector 使用的 ListerWatcher 接口。
37
6
38
- -> ListAndWatch
39
- func (r * Reflector) ListAndWatch(stopCh <-chan struct{}) error {
40
- options := metav1.ListOptions{ResourceVersion: "0"}
41
- list, err := r.listerWatcher.List(options)
42
- ...
43
- for {
44
- options = metav1.ListOptions{
45
- ResourceVersion: resourceVersion,
46
- TimeoutSeconds: &timeoutSeconds,
47
- }
48
- ...
49
- w, err := r.listerWatcher.Watch(options)
50
- ...
51
- if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
52
- ...
53
- }
54
- }
55
- }
7
+ ## ListWatcher 接口
56
8
57
- ## ListAndWatch
9
+ ListWatcher 接口定义了 List 和 Watch 特定资源类型的方法。
58
10
59
- // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
11
+ ``` go
12
+ // 来源于 k8s.io/client-go/tools/cache/listwatch.go
60
13
type ListerWatcher interface {
61
- // List should return a list type object; the Items field will be extracted, and the
62
- // ResourceVersion field will be used to start the watch in the right place.
14
+ // List() 返回对象列表;从这些对象中提取 ResourceVersion,用户后续的 Watch() 方法参数;
63
15
List (options metav1.ListOptions ) (runtime.Object , error )
64
- // Watch should begin a watch at the specified version.
16
+ // Watch() 方法从指定的版本(options 中指定)开始 Watch。
65
17
Watch (options metav1.ListOptions ) (watch.Interface , error )
66
18
}
67
19
@@ -71,32 +23,35 @@ type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
71
23
// WatchFunc knows how to watch resources
72
24
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
73
25
74
- // ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
75
- // It is a convenience function for users of NewReflector, etc.
76
- // ListFunc and WatchFunc must not be nil
26
+ // Getter interface knows how to access Get method from RESTClient.
27
+ type Getter interface {
28
+ Get () *restclient.Request
29
+ }
30
+ ```
31
+
32
+ ## 实现 ListWatcher 接口的类型 ListWatch
33
+
34
+ ListWatch 类型实现了 ` ListWatcher ` 接口,被 ` NewReflector() ` 函数和各 K8S 内置资源对象的 ` NewFilteredXXXInformer() ` 函数(见后文)使用。
35
+ ``` go
36
+ // 来源于 k8s.io/client-go/tools/cache/listwatch.go
77
37
type ListWatch struct {
78
38
ListFunc ListFunc
79
39
WatchFunc WatchFunc
80
- // DisableChunking requests no chunking for this list watcher.
81
40
DisableChunking bool
82
41
}
42
+ ```
83
43
84
- // Getter interface knows how to access Get method from RESTClient.
85
- type Getter interface {
86
- Get() * restclient.Request
87
- }
44
+ 函数 ` NewListWatchFromClient() ` 和 ` NewFilteredListWatchFromClient() ` 返回 ListWatch 对象,传入的 Getter 参数是 K8S 特定资源类型的 Client:
88
45
89
- // NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
46
+ ``` go
47
+ // 来源于 k8s.io/client-go/tools/cache/listwatch.go
90
48
func NewListWatchFromClient (c Getter , resource string , namespace string , fieldSelector fields .Selector ) *ListWatch {
91
49
optionsModifier := func (options *metav1.ListOptions ) {
92
50
options.FieldSelector = fieldSelector.String ()
93
51
}
94
52
return NewFilteredListWatchFromClient (c, resource, namespace, optionsModifier)
95
53
}
96
54
97
- // NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
98
- // Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
99
- // to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
100
55
func NewFilteredListWatchFromClient (c Getter , resource string , namespace string , optionsModifier func (options *metav1.ListOptions )) *ListWatch {
101
56
listFunc := func (options metav1.ListOptions ) (runtime.Object , error ) {
102
57
optionsModifier (&options)
@@ -118,21 +73,61 @@ func NewFilteredListWatchFromClient(c Getter, resource string, namespace string,
118
73
}
119
74
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
120
75
}
76
+ ```
77
+
78
+ ### List() 方法
79
+
80
+ List() 方法调用 K8S Client 从 apiserver 获取一批特定类型对象列表:
121
81
82
+ ``` go
83
+ // 来源于 k8s.io/client-go/tools/cache/listwatch.go
122
84
// List a set of apiserver resources
123
85
func (lw *ListWatch ) List (options metav1 .ListOptions ) (runtime .Object , error ) {
124
86
if !lw.DisableChunking {
125
87
return pager.New (pager.SimplePageFunc (lw.ListFunc )).List (context.TODO (), options)
126
88
}
127
89
return lw.ListFunc (options)
128
90
}
91
+ ```
129
92
93
+ ### Watch() 方法
94
+
95
+ ``` go
96
+ // 来源于 k8s.io/client-go/tools/cache/listwatch.go
130
97
// Watch a set of apiserver resources
131
98
func (lw *ListWatch ) Watch (options metav1 .ListOptions ) (watch .Interface , error ) {
132
99
return lw.WatchFunc (options)
133
100
}
101
+ ```
102
+
103
+ ## 使用 ListWatch 的 K8S 资源对象
104
+
105
+ ``` go
106
+ // 来源于 k8s.io/client-go/informers/extensions/v1beta1/deployment.go
107
+ func NewFilteredDeploymentInformer (client kubernetes .Interface , namespace string , resyncPeriod time .Duration , indexers cache .Indexers , tweakListOptions internalinterfaces .TweakListOptionsFunc ) cache .SharedIndexInformer {
108
+ return cache.NewSharedIndexInformer (
109
+ &cache.ListWatch {
110
+ ListFunc: func (options v1.ListOptions ) (runtime.Object , error ) {
111
+ if tweakListOptions != nil {
112
+ tweakListOptions (&options)
113
+ }
114
+ return client.ExtensionsV1beta1 ().Deployments (namespace).List (options)
115
+ },
116
+ WatchFunc: func (options v1.ListOptions ) (watch.Interface , error ) {
117
+ if tweakListOptions != nil {
118
+ tweakListOptions (&options)
119
+ }
120
+ return client.ExtensionsV1beta1 ().Deployments (namespace).Watch (options)
121
+ },
122
+ },
123
+ &extensionsv1beta1.Deployment {},
124
+ resyncPeriod,
125
+ indexers,
126
+ )
127
+ }
128
+ ```
134
129
135
- ##
130
+ ##
136
131
137
132
``` go
138
133
func (r *Reflector ) Run (stopCh <-chan struct{}) {
0 commit comments