Skip to content

Commit 685e1f0

Browse files
author
Zhang Jun
committed
init
1 parent 882b68c commit 685e1f0

5 files changed

+1232
-0
lines changed

client-go/1.store-indexer-cache.md

+384
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,384 @@
1+
Store 是 KV 类型的缓存。
2+
Indexer 在 Store 接口的基础上,添加了对象索引功能。索引是索引值和对象 Key 集合的映射表,不同 IndexFunc 为对象产生不同的索引值列表。通过索引值可以快速获取对象的 Key。
3+
cache 是实现了 Store 和 Indexer 接口的内置对象,它内部使用了实现 ThreadSafeStore 接口的对象和 KeyFunc 类型的函数。
4+
后续创建的 Informer 使用 DeletionHandlingMetaNamespaceKeyFunc 作为 KeyFunc 的 cache 实现对象缓存。
5+
6+
## Store
7+
8+
Store 是 KV 类型的缓存,用于保存和查询对象。但它没有定义生成对象标识 key 的方式,由具体的实现来定的(见后文分析的 `cache` 对象)。
9+
10+
// 来源于 k8s.io/client-go/tools/cache/store.go
11+
type Store interface {
12+
Add(obj interface{}) error
13+
Update(obj interface{}) error
14+
Delete(obj interface{}) error
15+
List() []interface{}
16+
ListKeys() []string
17+
Get(obj interface{}) (item interface{}, exists bool, err error)
18+
GetByKey(key string) (item interface{}, exists bool, err error)
19+
20+
// 使用传入的对象列表替换 Store 中的对象
21+
Replace([]interface{}, string) error
22+
Resync() error
23+
}
24+
25+
`NewStore` 函数返回一个实现该接口的 `cache` 类型对象(见后文分析)。
26+
27+
由于 `Queue` 接口是 `Store` 的超级,所以实现 `Queue` 接口的 `FIFO` 类型(fifo.go)、`DeltaFIFO` 类型(delta_file.go) 也实现了 `Store`接口。(详见: [2.queue-fifo-delta_fifo.md](2.queue-fifo-delta_fifo.md)
28+
29+
## 索引接口 Indexer
30+
Index 是在 Store 的基础上,添加了索引功能,方便后续按照索引来快速获取(一批)对象。
31+
32+
// 来源于 k8s.io/client-go/tools/cache/index.go
33+
type Indexer interface {
34+
// Index 实现了 Store 的接口
35+
Store
36+
// 返回注册的、名为 indexName 的索引函数
37+
// Retrieve list of objects that match on the named indexing function
38+
Index(indexName string, obj interface{}) ([]interface{}, error)
39+
// IndexKeys returns the set of keys that match on the named indexing function.
40+
IndexKeys(indexName, indexKey string) ([]string, error)
41+
// ListIndexFuncValues returns the list of generated values of an Index func
42+
ListIndexFuncValues(indexName string) []string
43+
// ByIndex lists object that match on the named indexing function with the exact key
44+
ByIndex(indexName, indexKey string) ([]interface{}, error)
45+
// GetIndexer return the indexers
46+
GetIndexers() Indexers
47+
48+
// AddIndexers adds more indexers to this store. If you call this after you already have data
49+
// in the store, the results are undefined.
50+
AddIndexers(newIndexers Indexers) error
51+
}
52+
53+
`NewIndexer` 函数返回一个实现该接口的 `cache` 类型对象(见后文分析)。
54+
55+
## 为对象生成索引值列表的 IndexFunc 和包含命名 IndexFunc 集合的 Indexers
56+
对象的索引是一个字符串列表,可以使用 `IndexFunc` 类型的函数来获取。
57+
58+
client-go 中提供了一个该类型的函数 `MetaNamespaceIndexFunc`,它使用对象所在的 `Namespace` 作为索引:
59+
60+
// 来源于 k8s.io/client-go/tools/cache/index.go
61+
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
62+
meta, err := meta.Accessor(obj)
63+
if err != nil {
64+
return []string{""}, fmt.Errorf("object has no meta: %v", err)
65+
}
66+
return []string{meta.GetNamespace()}, nil
67+
}
68+
69+
同时为该 IndexFunc 定义了一个名称 `NamespaceIndex string = "namespace"`,该名称后续可以作为 `Indexes` 的 map key。
70+
71+
除了 client-go 预定义的 IndexFunc,开发者也可以定义其它 IndexFunc,并给它们分别命名后保存到 Indexers 中:
72+
73+
// 来源于 k8s.io/client-go/tools/cache/index.go
74+
// map[索引函数名称]索引函数
75+
type Indexers map[string]IndexFunc
76+
77+
对象通过 IndexFunc 可以产生多个索引,而通过 KeyFunc 只能生产一个唯一表示该对象的 key。
78+
79+
80+
## 索引数据结构 Index 和 Indices
81+
82+
前面说过,一个对象通过 `IndexFunc` 可以产生多个索引,所以一个索引匹配一组对象。
83+
84+
`Index` 是用于保存某个索引匹配的对象 Key 集合。
85+
86+
`Indices` 是用命名的 IndexFunc 作为 map key 的 Index 集合,它保存各命名函数产生的所有索引,即各索引匹配的一组对象的集合。
87+
88+
// 来源于 k8s.io/client-go/tools/cache/index.go
89+
// map[索引字符串]set{对象 Key 集合}
90+
type Index map[string]sets.String
91+
92+
// 对象的索引。map[索引函数名称]Index
93+
type Indices map[string]Index
94+
95+
96+
调用 AddIndexers() 方法向 Index 添加各种索引函数,进而为 Store 中的对象创建多种索引。
97+
98+
## 多线程安全的、带有索引的缓存 ThreadSafeStore
99+
100+
`ThreadSafeStore` 通过锁机制,实现多 goroutine 可以并发访问的带有索引(Indexer 接口方法)功能的 K-V 数据库(Store 接口方法)。
101+
102+
`ThreadSafeStore` 后续用于实现 Indexer 和 Store 接口,所以它的方法集中包含 Store/Indexer 接口中定义的同名方法,但和 Store 接口中的同名方法相比,多了一个 obj key 参数,少了一个 GetByKey 方法。
103+
104+
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
105+
type ThreadSafeStore interface {
106+
// 下面这些方法和 Store 接口方法同名,差别在于多了唯一标识 obj 的 key 参数
107+
Add(key string, obj interface{})
108+
Update(key string, obj interface{})
109+
Delete(key string)
110+
List() []interface{}
111+
ListKeys() []string
112+
Get(key string) (item interface{}, exists bool) // 相比 Store 接口,缺少了 GetByKey(key string) 方法
113+
114+
Replace(map[string]interface{}, string)
115+
Resync() error
116+
117+
// 下面这些是 Indexer 的接口方法
118+
Index(indexName string, obj interface{}) ([]interface{}, error)
119+
IndexKeys(indexName, indexKey string) ([]string, error)
120+
ListIndexFuncValues(name string) []string
121+
ByIndex(indexName, indexKey string) ([]interface{}, error)
122+
GetIndexers() Indexers
123+
AddIndexers(newIndexers Indexers) error
124+
}
125+
126+
后文会具体分析这些方法的功能。
127+
128+
`NewThreadSafeStore` 返回一个实现该接口的对象,该对象的类型是 `threadSafeMap`
129+
130+
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
131+
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
132+
return &threadSafeMap{
133+
items: map[string]interface{}{},
134+
indexers: indexers,
135+
indices: indices,
136+
}
137+
}
138+
139+
`threadSafeMap` 使用内置的 items 保存所有对象。
140+
141+
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
142+
// threadSafeMap implements ThreadSafeStore
143+
type threadSafeMap struct {
144+
lock sync.RWMutex
145+
// 对象集合。使用对象的 Key 作为 map key;
146+
items map[string]interface{}
147+
// 命名的索引函数集合。map[索引函数名称]索引函数
148+
indexers Indexers
149+
// 对象的索引。map[索引函数名称][索引字符串]set{对象 Key 集合}
150+
indices Indices
151+
}
152+
153+
我们看看 `threadSafeMap` 的方法实现:
154+
155+
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
156+
func (c *threadSafeMap) Add(key string, obj interface{}) {
157+
c.lock.Lock()
158+
defer c.lock.Unlock()
159+
oldObject := c.items[key]
160+
c.items[key] = obj
161+
c.updateIndices(oldObject, obj, key)
162+
}
163+
164+
func (c *threadSafeMap) Update(key string, obj interface{}) {
165+
c.lock.Lock()
166+
defer c.lock.Unlock()
167+
oldObject := c.items[key]
168+
c.items[key] = obj
169+
c.updateIndices(oldObject, obj, key)
170+
}
171+
172+
当 Add/Update 一个 obj 时,先使用传入的 obj 更新缓存,然后调用 `updateIndices` 方法更新索引。
173+
`updateIndices` 方法分别使用 indexers 中的索引函数,为对象创建索引值(多个),然后将这些索引以及该对象的 Key 更新到索引中(c.indices)。
174+
175+
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
176+
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
177+
// 从索引中移除老的 obj
178+
if oldObj != nil {
179+
c.deleteFromIndices(oldObj, key)
180+
}
181+
// 遍历 c.indexers 中的索引函数,调用它们为对象产生索引列表
182+
for name, indexFunc := range c.indexers {
183+
indexValues, err := indexFunc(newObj)
184+
if err != nil {
185+
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
186+
}
187+
// 获取当前索引函数创建的索引
188+
index := c.indices[name]
189+
if index == nil {
190+
index = Index{}
191+
c.indices[name] = index
192+
}
193+
// 将所有索引值和对象 Key 更新到缓存 c.indices 中
194+
for _, indexValue := range indexValues {
195+
set := index[indexValue]
196+
if set == nil {
197+
set = sets.String{}
198+
index[indexValue] = set
199+
}
200+
set.Insert(key)
201+
}
202+
}
203+
}
204+
205+
`Delete` 方法的实现:
206+
207+
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
208+
func (c *threadSafeMap) Delete(key string) {
209+
c.lock.Lock()
210+
defer c.lock.Unlock()
211+
if obj, exists := c.items[key]; exists {
212+
c.deleteFromIndices(obj, key)
213+
delete(c.items, key)
214+
}
215+
}
216+
217+
先后从索引和内部缓存中删除元素。 `deleteFromIndices` 方法遍历 c.indexers 中的索引函数,为 ojb 计算索引值列表,然后再从缓存 c.indices 的索引值的对象 Key 集合中删除该对象 Key。
218+
注意:因为一个索引值可能匹配多个对象,所以不能直接删除 c.indics 的索引值。
219+
220+
`Replace` 方法使用传入的 items 更新内部缓存,然后重建索引:
221+
222+
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
223+
c.lock.Lock()
224+
defer c.lock.Unlock()
225+
c.items = items
226+
227+
// rebuild any index
228+
c.indices = Indices{}
229+
for key, item := range c.items {
230+
c.updateIndices(nil, item, key)
231+
}
232+
}
233+
234+
`ThreadSafeStore` 的其它方法功能介绍:
235+
236+
1. `Index(indexName string, obj interface{}) ([]interface{}, error)`
237+
indexName 为索引函数名称(下同)。使用对应的索引函数为 obj 创建索引值列表,然后查询索引,返回匹配这些索引值的对象列表(去重)。
238+
2. `ByIndex(indexName, indexKey string) ([]interface{}, error)`
239+
indexKey 为索引值,查询索引,返回这个索引值匹配的对象列表;
240+
3. `IndexKeys(indexName, indexKey string) ([]string, error)`
241+
indexKey 为索引值,查询索引,返回这个索引匹配的对象 Key 列表;
242+
4. `ListIndexFuncValues(indexName string) []string`
243+
查询索引,返回 indexName 对应的索引函数创建的所有索引包含的对象 Key 列表;
244+
5. `GetIndexers() Indexers`
245+
返回命名的索引函数集合 `c.indexers`
246+
6. `AddIndexers(newIndexers Indexers) error`
247+
将 nexIndexers 中的命名函数添加到 c.indexers 中。
248+
必须在添加任何对象前调用该方法,否则会出错返回。
249+
newIndexers 中的命名方法不能与 c.indexers 中保存的命名冲突,否则出错返回。
250+
7. `Resync() error`
251+
直接返回。因为 Add/Update/Delete/Replace 方法都会同时更新缓存和索引,两者时刻是同步的。
252+
253+
## KeyFunc 和实现 Store/Indexer 接口的 cache
254+
255+
在分析 `cache` 类型之前,我们先分析为对象生成唯一标识字符串的函数类型 `KeyFunc`
256+
257+
// 来源于 k8s.io/client-go/tools/cache/store.go
258+
// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
259+
type KeyFunc func(obj interface{}) (string, error)
260+
261+
client-go 提供了两个 KeyFunc 类型函数 `MetaNamespaceKeyFunc``DeletionHandlingMetaNamespaceKeyFunc` (实际用的最多):
262+
+ `MetaNamespaceKeyFunc`:提取对象的 `<namespace>/<object-name>``<object-name>` 作为 Key;
263+
+ `DeletionHandlingMetaNamespaceKeyFunc`:先检查对象是不是 DeletedFinalStateUnknown 类型,如果是直接返回对象的 Key 字段,否则调用 `MetaNamespaceKeyFunc`
264+
265+
// 来源于 k8s.io/client-go/tools/cache/store.go
266+
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
267+
// 如果对象时字符串,则直接使用它作为 Key
268+
if key, ok := obj.(ExplicitKey); ok {
269+
return string(key), nil
270+
}
271+
// 否则提取对象的 Meta 信息
272+
meta, err := meta.Accessor(obj)
273+
if err != nil {
274+
return "", fmt.Errorf("object has no meta: %v", err)
275+
}
276+
// 如果对象有 Namespace 则使用 `<namespace>/<object-name>` 作为 key
277+
// 否则使用 `<object-name>` 作为 key
278+
if len(meta.GetNamespace()) > 0 {
279+
return meta.GetNamespace() + "/" + meta.GetName(), nil
280+
}
281+
return meta.GetName(), nil
282+
}
283+
284+
// 来源于 k8s.io/client-go/tools/cache/controller.go
285+
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
286+
// DeletedFinalStateUnknown 是封装了删除对象 key 和对象本身的类型,由 DeltaFIFO.Replace() 方法产生
287+
if d, ok := obj.(DeletedFinalStateUnknown); ok {
288+
return d.Key, nil
289+
}
290+
return MetaNamespaceKeyFunc(obj)
291+
}
292+
`MetaNamespaceKeyFunc` 功能相反的是 `SplitMetaNamespaceKey` 函数,它将传入的 Key 分解,返回对应的 namespace 和 obj name。
293+
294+
然后我们再来分析内置类型 `cache`
295+
296+
`cache` 实现了 `Indexer``Store` 接口,函数 `NewIndexer``NewStore` 均返回该类型的对象:
297+
298+
// 来源于 k8s.io/client-go/tools/cache/store.go
299+
// NewIndexer returns an Indexer implemented simply with a map and a lock.
300+
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
301+
return &cache{
302+
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
303+
keyFunc: keyFunc,
304+
}
305+
}
306+
307+
// 来源于 k8s.io/client-go/tools/cache/store.go
308+
// NewStore returns a Store implemented simply with a map and a lock.
309+
func NewStore(keyFunc KeyFunc) Store {
310+
return &cache{
311+
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
312+
keyFunc: keyFunc,
313+
}
314+
}
315+
316+
在分析 `ThreadSafeStore` 时提到过,它的缓存相关方法如 `Add/Update/Delete/Get`**需要传入**对象的标识字符串 Key。而 `cache` 封装了 `ThreadSafeStore``KeyFunc` 类型成员,它使用成员 `keyFunc` 函数为添加到 `cacheStorage` 的对象生成标识字符串 Key。以 cache 的 `Add` 方法为例:
317+
318+
// 来源于 k8s.io/client-go/tools/cache/store.go
319+
func (c *cache) Add(obj interface{}) error {
320+
key, err := c.keyFunc(obj)
321+
if err != nil {
322+
return KeyError{obj, err}
323+
}
324+
c.cacheStorage.Add(key, obj)
325+
return nil
326+
}
327+
328+
cache 类型实现了 `Store``Indexer` 接口,在很多地方都有应用,如 Informer 和 DeltaFIFO:
329+
330+
1. NewInformer 函数:
331+
// 来源于 k8s.io/client-go/tools/cache/controller.go
332+
func NewInformer(
333+
lw ListerWatcher,
334+
objType runtime.Object,
335+
resyncPeriod time.Duration,
336+
h ResourceEventHandler,
337+
) (Store, Controller){
338+
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
339+
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
340+
cfg := &Config{
341+
Queue: fifo,
342+
...
343+
}
344+
...
345+
return clientState, New(cfg)
346+
}
347+
348+
2. NewIndexerInformer 函数:
349+
// 来源于 k8s.io/client-go/tools/cache/controller.go
350+
func NewIndexerInformer(
351+
lw ListerWatcher,
352+
objType runtime.Object,
353+
resyncPeriod time.Duration,
354+
h ResourceEventHandler,
355+
indexers Indexers,
356+
) (Indexer, Controller) {
357+
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
358+
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
359+
cfg := &Config{
360+
Queue: fifo,
361+
...
362+
}
363+
...
364+
return clientState, New(cfg)
365+
}
366+
3. NewSharedIndexInformer 函数:
367+
// k8s.io/client-go/tools/cache/shared_informer.go
368+
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
369+
realClock := &clock.RealClock{}
370+
sharedIndexInformer := &sharedIndexInformer{
371+
processor: &sharedProcessor{clock: realClock},
372+
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
373+
listerWatcher: lw,
374+
objectType: objType,
375+
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
376+
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
377+
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
378+
clock: realClock,
379+
}
380+
return sharedIndexInformer
381+
}
382+
383+
384+
后面文章我们讨论 Informer 使用的 Queue 和 FIFO。

0 commit comments

Comments
 (0)