Store
是 K-V 类型的内存对象缓存。
Indexer
在 Store
的基础上添加了对象索引,从而实现快速查找对象的功能。
IndexFunc
类型的函数可以为对象生成索引列表。
cache
实现了 Store
和 Indexer
接口,函数 NewIndexer()
和 NewStore()
返回 cache
类型的对象。
cache
使用比较广泛,如各种 Informer
和 DeltaFIFO
用它做对象缓存和索引。
Store 是 KV 类型的对象缓存:
// 来源于 k8s.io/client-go/tools/cache/store.go
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
// 使用传入的对象列表替换 Store 中的对象
Replace([]interface{}, string) error
Resync() error
}
NewStore()
函数返回一个实现该接口的 struct cache
类型对象(见后文分析)。
Queue
接口是 Store
的超集,所以实现 Queue
接口的 FIFO
类型、DeltaFIFO
类型也实现了 Store
接口。(详见: 2.queue-fifo-delta_fifo.md)
Indexer 是在 Store 的基础上,添加了索引功能,方便快速获取(一批)对象。
// 来源于 k8s.io/client-go/tools/cache/index.go
type Indexer interface {
// Index 实现了 Store 的接口
Store
// 返回注册的、名为 indexName 的索引函数
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(indexName string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}
NewIndexer()
函数返回一个实现该接口的 struct cache
类型对象(见后文分析)。
对象的索引是一个字符串列表,由 IndexFunc
类型的函数生成,所以索引值和对象是一对多关系。
// 来源于 k8s.io/client-go/tools/cache/index.go
type IndexFunc func(obj interface{}) ([]string, error)
client-go package 提供了名为 NamespaceIndex
的 IndexFunc 类型函数 MetaNamespaceIndexFunc
,它提取对象的 Namespace
作为索引:
// 来源于 k8s.io/client-go/tools/cache/index.go
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}
多个命名的 IndexFunc 类型函数用 Indexers
表示:
// 来源于 k8s.io/client-go/tools/cache/index.go
// map[索引函数名称]索引函数
type Indexers map[string]IndexFunc
Indexer 接口的 AddIndexers()
方法为对象添加索引函数,从而为对象生成不同类型的索引,满足各种查找需求。
类似于 IndexFunc 为对象生成索引值列表,KeyFunc
函数(见后文)为对象生成一个唯一的标识字符串,称为对象 Key。
前面说过,索引值和对象是一对多映射关系:
Index
:保存索引匹配的对象集合(用它的唯一表示 Key 表示);Indices
:保存某个索引函数生成的所有索引的对象集合;
// 来源于 k8s.io/client-go/tools/cache/index.go
// map[索引字符串]set{对象 Key 集合}
type Index map[string]sets.String
// 对象的索引。map[索引函数名称]Index
type Indices map[string]Index
后文分析 cache 类型时将用到 Index 和 Indices。
// 来源于 k8s.io/client-go/tools/cache/store.go
type KeyFunc func(obj interface{}) (string, error)
client-go package 提供了两个 KeyFunc 类型函数 MetaNamespaceKeyFunc
和 DeletionHandlingMetaNamespaceKeyFunc
(实际用的最多):
MetaNamespaceKeyFunc
:提取对象的<namespace>/<object-name>
或<object-name>
作为 Key;DeletionHandlingMetaNamespaceKeyFunc
:先检查对象是不是DeletedFinalStateUnknown
类型(见后文),如果是,直接返回该对象内的 Key 字段,否则调用MetaNamespaceKeyFunc
函数生成 Key;
// 来源于 k8s.io/client-go/tools/cache/store.go
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
// 如果对象是字符串,则直接使用它作为 Key
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
// 否则提取对象的 Meta 信息
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
// 如果对象有 Namespace 则用 `<namespace>/<object-name>` 作为 key
// 否则用 `<object-name>` 作为 key
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}
// 来源于 k8s.io/client-go/tools/cache/controller.go
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
// DeletedFinalStateUnknown 封装了删除对象 Key 和对象自身的类型,由 DeltaFIFO.Replace() 方法产生
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return MetaNamespaceKeyFunc(obj)
}
与 MetaNamespaceKeyFunc()
功能相反的是 SplitMetaNamespaceKey()
函数,它将传入的 Key 分解,返回对象所在的命名空间和对象名称。
使用 KeyFunc 比较多的场景是创建 Store、Index 和 DeltaFIFO,如:
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
因为 DeltaFIFO 接收的是从 apiserver List/Watch 的 K8S 资源对象,所以可以用 MetaNamespaceKeyFunc 提取它的 NS 和 Name。
但是 NewIndexer() 返回的 clientState 一般是 Reflector 从 DeltaFIFO 弹出的 Deltas,它是一个 Delta 的列表,而 Delta.Object 可能是 K8S 资源对象,也可能是 DeletedFinalStateUnknown 类型对象。所以 NewIndexer() 使用能区分这两种类型的 DeletionHandlingMetaNamespaceKeyFunc KeyFunc。
ThreadSafeStore
通过锁机制,实现多 goroutine 可以并发访问的、带有索引功能(Indexer
接口)的对象缓存(Store
接口)。
ThreadSafeStore
本身没有实现 Indexer
和 Store
接口,但包含同名方法,后文会介绍,struct cache
类型内部使用 ThreadSafeStore
实现了 Indexer
和 Store
接口。
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
// 下面这些方法和 Store 接口方法同名,差别在于多了唯一标识对象的 �key 参数
// 相比 Store 接口,缺少了 GetByKey() 方法
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
List() []interface{}
ListKeys() []string
Get(key string) (item interface{}, exists bool)
Replace(map[string]interface{}, string)
Resync() error
// 下面这些是 Indexer 定义的接口方法
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}
NewThreadSafeStore()
函数返回一个实现该接口的对象,类型是 threadSafeMap
:
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}
threadSafeMap
使用 map 缓存所有对象:
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
// 对象缓存。使用对象的 Key 作为 map key;
items map[string]interface{}
// 命名的索引函数集合。map[索引函数名称]索引函数
indexers Indexers
// 对象的索引缓存。map[索引函数名称][索引字符串]set{对象 Key 集合}
indices Indices
}
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
func (c *threadSafeMap) Update(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
当 Add()/Update()
一个对象时,先将它加到缓存,然后用 updateIndices()
方法更新索引。
updateIndices()
方法使用 c.indexers
中的索引函数,为对象创建多种类型索引值列表,然后将这些索引及对象的 Key 更新到索引缓存中(c.indices
)。
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// 从索引中移除老的 obj
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
// 遍历 c.indexers 中的索引函数,为对象生成不同类型的索引列表
for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 获取当前索引函数创建的索引
index := c.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
}
// 将所有索引值和对象 Key 更新到索引缓存 c.indices 中
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
}
}
Delete()
方法先后从索引缓存和对象缓存中删除对象。deleteFromIndices()
方法遍历 c.indexers
中的索引函数,为对象计算索引值列表,然后再从索引缓存和对象缓存中删除该对象:
// 来源于 k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
// 从索引缓存中删除对象
c.deleteFromIndices(obj, key)
// 从对象缓存中删除对象
delete(c.items, key)
}
}
注意:一个索引值可能匹配多个对象,所以不能直接删除索引缓存中索引值对应的对象集合。
Replace()
方法使用传入的对象列表替换内部缓存,然后重建索引:
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock()
defer c.lock.Unlock()
c.items = items
// rebuild any index
c.indices = Indices{}
for key, item := range c.items {
c.updateIndices(nil, item, key)
}
}
-
Index(indexName string, obj interface{}) ([]interface{}, error)
indexName 为索引函数名称(下同)。使用对应的索引函数为对象生成索引值列表,然后查询索引缓存,返回匹配这些索引值的对象列表(去重)。
-
ByIndex(indexName, indexKey string) ([]interface{}, error)
indexKey 为索引值,查询索引缓存,返回它匹配的对象列表;
-
IndexKeys(indexName, indexKey string) ([]string, error)
indexKey 为索引值,查询索引缓存,返回它匹配的对象 Key 列表;
-
ListIndexFuncValues(indexName string) []string
查询索引缓存,返回 indexName 对应的索引函数创建的所有索引包含的对象 Key 列表;
-
GetIndexers() Indexers
返回命名的索引函数集合
c.indexers
-
AddIndexers(newIndexers Indexers) error
将 newIndexers 中的命名函数添加到索引函数集合 c.indexers 中。
必须在添加任何对象前调用该方法,否则会出错返回。
newIndexers 中的命名函数不能与 c.indexers 中已有的函数重名,否则出错返回。
-
Resync() error
直接返回。因为 Add/Update/Delete/Replace 方法都会同时更新缓存和索引,两者时刻是同步的。
在分析 ThreadSafeStore
接口时提到过,它的方法如 Add/Update/Delete/Get()
都需要传入对象的 Key。
而 cache
则封装了 ThreadSafeStore
和 KeyFunc
,后者为添加到 ThreadSafeStore
的对象生成 Key:
// 来源于 k8s.io/client-go/tools/cache/store.go
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
}
cache
实现了 Indexer
和 Store
接口,函数 NewIndexer()
和 NewStore()
均返回该类型的对象:
// 来源于 k8s.io/client-go/tools/cache/store.go
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
// 来源于 k8s.io/client-go/tools/cache/store.go
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
后文会介绍,DeltaFIFO 使用 NewStore() 或 NewIndexer() 返回的 cache 作为 knownObjects 对象缓存,而且传给这两个函数的 KeyFunc 一般是 DeletionHandlingMetaNamespaceKeyFunc
:
-
NewInformer
函数:// 来源于 k8s.io/client-go/tools/cache/controller.go func NewInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, ) (Store, Controller) { // This will hold the client state, as we know it. clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) return clientState, newInformer(lw, objType, resyncPeriod, h, clientState) }
-
NewIndexerInformer
函数:// 来源于 k8s.io/client-go/tools/cache/controller.go func NewIndexerInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, indexers Indexers, ) (Indexer, Controller) { // This will hold the client state, as we know it. clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) return clientState, newInformer(lw, objType, resyncPeriod, h, clientState) }
-
NewSharedIndexInformer
函数:// 来源于 k8s.io/client-go/tools/cache/shared_informer.go func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: objType, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), clock: realClock, } return sharedIndexInformer }