# Kubernetes client-go 源码分析 - DeltaFIFO


## 概述

>  源码版本信息
>
>  - Project: kubernetes
>  - Branch: master
>  - Last commit id: d25d741c
>  - Date: 2021-09-26

我们在[《Kubernetes client-go 源码分析 - 开篇》](../k8s-client-go-summary/)里提到了自定义控制器涉及到的 client-go 组件整体工作流程，大致如下图：

![client-go](client-go.png "client-go")

DeltaFIFO 是上面的一个重要组件，今天我们来详细研究下 client-go 里 DeltaFIFO 相关代码。

![Delta FIFO](delta_fifo.png "Delta FIFO")

## Queue 接口

类似 workqueue 里的队列概念，这里也有一个队列，Queue 接口定义在 *client-go/tools/cache* 包中的 *fifo.go* 文件里，看下有哪些方法：

```go
type Queue interface {
   Store
   Pop(PopProcessFunc) (interface{}, error) // 会阻塞，直到有一个元素可以被 pop 出来，或者队列关闭
   AddIfNotPresent(interface{}) error
   HasSynced() bool
   Close()
}
```

这里嵌里一个 Store 接口，对应定义如下：

```go
type Store interface {
   Add(obj interface{}) error
   Update(obj interface{}) error
   Delete(obj interface{}) error
   List() []interface{}
   ListKeys() []string
   Get(obj interface{}) (item interface{}, exists bool, err error)
   GetByKey(key string) (item interface{}, exists bool, err error)
   Replace([]interface{}, string) error
   Resync() error
}
```

Store 接口的方法都比较直观，Store 的实现有很多，我们等下看 Queue 里用到的是哪个实现。

Queue 接口的实现是 FIFO 和 DeltaFIFO 两个类型，我们在 Informer 里用到的是 DeltaFIFO，而 DeltaFIFO 也没有依赖 FIFO，所以下面我们直接看 DeltaFIFO 是怎么实现的。

## DeltaFIFO

- **client-go/tools/cache/delta_fifo.go:97**

```go
type DeltaFIFO struct {
   lock sync.RWMutex
   cond sync.Cond
   items map[string]Deltas
   queue []string               // 这个 queue 里是没有重复元素的，和上面 items 的 key 保持一致
   populated bool
   initialPopulationCount int
   keyFunc KeyFunc              // 用于构造上面 map 用到的 key
   knownObjects KeyListerGetter // 用来检索所有的 keys
   closed bool
   emitDeltaTypeReplaced bool
}
```

这里有一个 Deltas 类型，看下具体的定义：

```go
type Deltas []Delta

type Delta struct {
	Type   DeltaType
	Object interface{}
}

type DeltaType string

const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	Replaced DeltaType = "Replaced"
	Sync DeltaType = "Sync"
)

```

可以看到 Delta 结构体保存的是 DeltaType（就是一个字符串）和发生了这种 Delta 的具体对象。

DeltaFIFO 内部主要维护的一个队列和一个 map，直观一点表示如下：

![Delta FIFO](delta_fifo.png "Delta FIFO")

DeltaFIFO 的 New 函数是 `NewDeltaFIFOWithOptions()`

- **client-go/tools/cache/delta_fifo.go:218**

```go
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
   if opts.KeyFunction == nil {
      opts.KeyFunction = MetaNamespaceKeyFunc
   }

   f := &DeltaFIFO{
      items:        map[string]Deltas{},
      queue:        []string{},
      keyFunc:      opts.KeyFunction,
      knownObjects: opts.KnownObjects,

      emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
   }
   f.cond.L = &f.lock
   return f
}
```

## 元素增删改 - queueActionLocked()

可以注意到 DeltaFIFO 的 Add() 等方法等方法体都很简短，大致这样：

```go
func (f *DeltaFIFO) Add(obj interface{}) error {
   f.lock.Lock()
   defer f.lock.Unlock()
   f.populated = true
   return f.queueActionLocked(Added, obj)
}
```

里面的逻辑就是调用 `queueActionLocked()` 方法传递对应的 DeltaType 进去，前面提到过 DeltaType 就是 Added、Updated、Deleted 等字符串，所以我们直接先看 `queueActionLocked()` 方法的实现。

- **client-go/tools/cache/delta_fifo.go:409**

```go
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
   id, err := f.KeyOf(obj) // 计算这个对象的 key
   if err != nil {
      return KeyError{obj, err}
   }
   oldDeltas := f.items[id] // 从 items map 里获取当前的 Deltas
   newDeltas := append(oldDeltas, Delta{actionType, obj}) // 构造一个 Delta，添加到 Deltas 中，也就是 []Delta 里
   newDeltas = dedupDeltas(newDeltas) // 如果最近个 Delta 是重复的，则保留后一个；目前版本只处理的 Deleted 重复场景

   if len(newDeltas) > 0 { // 理论上 newDeltas 长度一定大于0
      if _, exists := f.items[id]; !exists {
         f.queue = append(f.queue, id) // 如果 id 不存在，则在队列里添加
      }
      f.items[id] = newDeltas // 如果 id 已经存在，则只更新 items map 里对应这个 key 的 Deltas
      f.cond.Broadcast()
   } else { // 理论上这里执行不到
      if oldDeltas == nil {
         klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
         return nil
      }
      klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
      f.items[id] = newDeltas
      return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
   }
   return nil
}
```

到这里再反过来看 Add() Delete() Update() Get() 等函数，就很清晰了，只是将对应变化类型的 obj 添加到队列中。

## Pop()

Pop 按照元素的添加或更新顺序有序返回一个元素(Deltas)，在队列为空时会阻塞。另外 Pop 过程会先从队列中删除一个元素然后返回，所以如果处理失败了需要通过 `AddIfNotPresent()` 方法将这个元素加回到队列中。

Pop 的参数是 `type PopProcessFunc func(interface{}) error` 类型的 process，在 `Pop()` 函数中直接将队列里的第一个元素出队，然后丢给 process 处理，如果处理失败会重新入队，但是这个 Deltas 和对应的错误信息会被返回。

- **client-go/tools/cache/delta_fifo.go:515**

```go
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
   f.lock.Lock()
   defer f.lock.Unlock()
   for { // 这个循环其实没有意义，和下面的 !ok 一起解决了一个不会发生的问题
      for len(f.queue) == 0 { // 如果为空则进入这个循环
         if f.closed { // 队列关闭则直接返回
            return nil, ErrFIFOClosed
         }
         f.cond.Wait() // 等待
      }
      id := f.queue[0] // queue 里放的是 key
      f.queue = f.queue[1:] // queue 中删除这个 key
      depth := len(f.queue)
      if f.initialPopulationCount > 0 { // 第一次调用 Replace() 插入的元素数量
         f.initialPopulationCount--
      }
      item, ok := f.items[id] // 从 items map[string]Deltas 中获取一个 Deltas
      if !ok { // 理论上不可能找不到，为此引入了上面的 for 嵌套，感觉不是很好
         klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
         continue
      }
      delete(f.items, id) // items map 中也删除这个元素
      // 当队列长度超过 10 并且处理一个元素时间超过 0.1 s 时打印日志；队列长度理论上不会变长因为处理一个元素时是阻塞的，这时候新的元素加不进来
      if depth > 10 {
         trace := utiltrace.New("DeltaFIFO Pop Process",
            utiltrace.Field{Key: "ID", Value: id},
            utiltrace.Field{Key: "Depth", Value: depth},
            utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
         defer trace.LogIfLong(100 * time.Millisecond)
      }
      err := process(item) // 丢给 PopProcessFunc 处理
      if e, ok := err.(ErrRequeue); ok { // 如果需要 requeue 则加回到队列里
         f.addIfNotPresent(id, item)
         err = e.Err
      }
      // 返回这个 Deltas 和错误信息
      return item, err
   }
}
```

我们看一下 Pop() 的实际调用场景：

- **client-go/tools/cache/controller.go:181**

```go
func (c *controller) processLoop() {
   for {
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      if err != nil {
         if err == ErrFIFOClosed {
            return
         }
         if c.config.RetryOnError {
            c.config.Queue.AddIfNotPresent(obj) // 其实 Pop 内部已经调用了 AddIfNotPresent，这里也有点多余；也许更加健壮吧
         }
      }
   }
}
```

到这还有一个疑问，就是 process 函数是怎么实现的？我们看 sharedIndexInformer 里的 process 函数逻辑（在我的另外一篇文章：《Kubernetes client-go Informer 源码分析》中会再次详细介绍这个方法）：

- **client-go/tools/cache/shared_informer.go:537**

```go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
   s.blockDeltas.Lock()
   defer s.blockDeltas.Unlock()

   // 这个遍历是从旧到新的过程
   for _, d := range obj.(Deltas) {
      switch d.Type {
      case Sync, Replaced, Added, Updated: // 下面一个 case 是 Deleted
         s.cacheMutationDetector.AddObject(d.Object)
         if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
            // 更新 indexer
            if err := s.indexer.Update(d.Object); err != nil {
               return err
            }

            isSync := false
            switch {
            case d.Type == Sync:
               isSync = true
            case d.Type == Replaced:
               if accessor, err := meta.Accessor(d.Object); err == nil {
                  if oldAccessor, err := meta.Accessor(old); err == nil {
                     isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                  }
               }
            }
            // 更新通知
            s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
         } else {
            // 将 obj 加到 indexer 里
            if err := s.indexer.Add(d.Object); err != nil {
               return err
            }
            // 添加通知
            s.processor.distribute(addNotification{newObj: d.Object}, false)
         }
      case Deleted: // 如果是删除，则从 indexer 中删除 obj
         if err := s.indexer.Delete(d.Object); err != nil {
            return err
         }
         // 发布一个删除消息
         s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
      }
   }
   return nil
}
```

## Replace()

Replace() 简单地做两件事：

1. 给传入的对象列表添加一个 Sync/Replace DeltaType 的 Delta
2. 然后执行一些删除逻辑

这里的 Replace() 过程可以简单理解成传递一个新的 []Deltas 过来，如果当前 DeltaFIFO 里已经有这些元素，则追加一个 Sync/Replace 动作，反之 DeltaFIFO 里多出来的 Deltas 则可能是与 apiserver 失联导致实际已经删除，但是删除动作没有 watch 到的那些对象，所以直接追加一个 Deleted 的 Delta；

```go
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
   f.lock.Lock()
   defer f.lock.Unlock()
   keys := make(sets.String, len(list)) // 用来保存 list 中每个 item 的 key

   // 老代码兼容逻辑
   action := Sync
   if f.emitDeltaTypeReplaced {
      action = Replaced
   }

   for _, item := range list { // 在每个 item 后面添加一个 Sync/Replaced 动作
      key, err := f.KeyOf(item)
      if err != nil {
         return KeyError{item, err}
      }
      keys.Insert(key)
      if err := f.queueActionLocked(action, item); err != nil {
         return fmt.Errorf("couldn't enqueue object: %v", err)
      }
   }

   if f.knownObjects == nil {
      queuedDeletions := 0
      for k, oldItem := range f.items { // 删除 f.items 里的老元素
         if keys.Has(k) {
            continue
         }

         var deletedObj interface{}
         if n := oldItem.Newest(); n != nil { // 如果 Deltas 不为空则有返回值
            deletedObj = n.Object
         }
         queuedDeletions++
         // 标记删除；因为和 apiserver 失联引起的删除状态没有及时获取到，所以这里是 DeletedFinalStateUnknown 类型
         if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
         }
      }

      if !f.populated {
         f.populated = true
         f.initialPopulationCount = keys.Len() + queuedDeletions
      }

      return nil
   }

   knownKeys := f.knownObjects.ListKeys() // key 就是例如 "default/pod_1" 这种字符串
   queuedDeletions := 0
   for _, k := range knownKeys {
      if keys.Has(k) {
         continue
      }
      // 新列表里不存在的老元素标记为将要删除
      deletedObj, exists, err := f.knownObjects.GetByKey(k)
      if err != nil {
         deletedObj = nil
         klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
      } else if !exists {
         deletedObj = nil
         klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
      }
      queuedDeletions++
      // 添加一个删除动作；因为与 apiserver 失联等场景会引起删除事件没有 wathch 到，所以是 DeletedFinalStateUnknown 类型
      if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
         return err
      }
   }

   if !f.populated {
      f.populated = true
      f.initialPopulationCount = keys.Len() + queuedDeletions
   }

   return nil
}
```

这里有一个 knownObjects 属性，要完整理解 Replace() 逻辑还得看下 knownObjects 是什么逻辑。

我们去跟 knownObjects 属性的初始化，可以看到其引用的是 cache 类型实现的 Store，cache 是实现 Indexer 的那个 cache，Indexer 的源码分析可以在我的另外一篇文章[《Kubernetes client-go Indexer / ThreadSafeStore 源码分析》](../k8s-client-go-indexer/) 中看到。

- **client-go/tools/cache/store.go:258**

```go
func NewStore(keyFunc KeyFunc) Store {
   return &cache{
      cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
      keyFunc:      keyFunc,
   }
}
```

这里是当作一个 Store 来用，而不是 Indexer。中 NewStore() 函数调用时传递的参数是：

```go
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
```

```go
// 处理了 DeletedFinalStateUnknown 对象获取 key 问题
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
   if d, ok := obj.(DeletedFinalStateUnknown); ok {
      return d.Key, nil
   }
   return MetaNamespaceKeyFunc(obj)
}
```

所以 knownObjects 通过 cache 类型实例，使用了和 Indexer 类似的机制，通过内部 ThreadSafeStore 来实现了检索队列所有元素的 keys 的能力。

DeltaFIFO 和 Indexer 之间还有一个桥梁 Informer，我们这里简单提到了 *sharedIndexInformer* 的 *HandleDeltas()* 方法，后面详细分析 Informer 的逻辑，最终再将整个自定义控制器和 client-go 相关组件逻辑串在一起。

> 转载请注明本文来自[胡涛的个人网站](https://www.danielhu.cn) - <https://www.danielhu.cn>

