# Kubernetes Deployment 源码分析（二）


## 概述

> 源码版本：kubernetes-v1.22.3 / commit-id: c92036

**Deployment** 是最常用的 Kubernetes 原生 **Workload** 资源之一，我们一开始尝试使用 Kubernetes 的时候大概率就是从运行一个 Deployment 类型的工作负载开始的。

在上一篇[《Kubernetes Deployment 源码分析（一）》](../k8s-deployment-1/)中我们过了下 Deployment 的全部特性，主要介绍“滚动更新”和“回滚”等主要功能，从而心中有个概念，知道 Deployment 的能力边界在那里，所以今天以此为基础，我们继续从源码角度看下 Deployment 的实现。

《Kubernetes Deployment Controller 源码分析》分为两讲：

- [《Kubernetes Deployment 源码分析（一）》](../k8s-deployment-1/) - 功能特性
- [《Kubernetes Deployment 源码分析（二）》](../k8s-deployment-2/) - 源码流程

## startDeploymentController 入口逻辑

DeploymentController 的初始化和启动入口是 `startDeploymentController()` 函数

- **cmd/kube-controller-manager/app/apps.go:72**

```go
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
   dc, err := deployment.NewDeploymentController(
      ctx.InformerFactory.Apps().V1().Deployments(),
      ctx.InformerFactory.Apps().V1().ReplicaSets(),
      ctx.InformerFactory.Core().V1().Pods(),
      ctx.ClientBuilder.ClientOrDie("deployment-controller"),
   )
   if err != nil {
      return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
   }
   go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
   return nil, true, nil
}
```

在 `startDeploymentController()` 函数中先通过 `NewDeploymentController()` 方法初始化一个 **DeploymentController** 实例，这里的参数 DeploymentInformer、ReplicaSetInformer、PodInformer 和 Clientset，因而 **DeploymentController** 也就具备了获取Deployment、 ReplicaSet、Pod 三类资源变更事件以及 CURD apiserver 操作各种资源的能力。接着这个函数中又调用了 **DeploymentController** 的 `Run()` 方法来启动 DeploymentController，这里的参数 ConcurrentDeploymentSyncs 默认值是 5，也就是默认情况下并发调谐的 Deployment 数量是 5 个。

## DeploymentController 对象

继续看下 DeploymentController 对象的定义和初始化。

### DeploymentController 类型定义

接着来看 DeploymentController 类型的定义

- **pkg/controller/deployment/deployment_controller.go:68**

```go
type DeploymentController struct {
   // ReplicaSet 操控器
   rsControl     controller.RSControlInterface
   client        clientset.Interface
   eventRecorder record.EventRecorder

   syncHandler func(dKey string) error
   // 测试用
   enqueueDeployment func(deployment *apps.Deployment)

   // 用来从 cache 里 get/list Deployment
   dLister appslisters.DeploymentLister
   // 用来从 cache 里 get/list ReplicaSet
   rsLister appslisters.ReplicaSetLister
   // 用来从 cache 里 get/list Pod
   podLister corelisters.PodLister

   dListerSynced cache.InformerSynced
   rsListerSynced cache.InformerSynced
   podListerSynced cache.InformerSynced

   // 工作队列，限速队列实现
   queue workqueue.RateLimitingInterface
}
```

### DeploymentController 初始化

- **pkg/controller/deployment/deployment_controller.go:101**

```go
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
   // Event 相关逻辑
   eventBroadcaster := record.NewBroadcaster()
   eventBroadcaster.StartStructuredLogging(0)
   eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

   // ……
  
   // new dc
   dc := &DeploymentController{
      client:        client,
      eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
      queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
   }
   // 主要是 clientset
   dc.rsControl = controller.RealRSControl{
      KubeClient: client,
      Recorder:   dc.eventRecorder,
   }
   // ResourceEventHandler 配置，后面会分析
   dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc:    dc.addDeployment,
      UpdateFunc: dc.updateDeployment,
      DeleteFunc: dc.deleteDeployment,
   })
   rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc:    dc.addReplicaSet,
      UpdateFunc: dc.updateReplicaSet,
      DeleteFunc: dc.deleteReplicaSet,
   })
   podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      DeleteFunc: dc.deletePod,
   })
   
   // 这里有主要逻辑，后面会讲
   dc.syncHandler = dc.syncDeployment
   dc.enqueueDeployment = dc.enqueue
   // 各种 lister
   dc.dLister = dInformer.Lister()
   dc.rsLister = rsInformer.Lister()
   dc.podLister = podInformer.Lister()
   dc.dListerSynced = dInformer.Informer().HasSynced
   dc.rsListerSynced = rsInformer.Informer().HasSynced
   dc.podListerSynced = podInformer.Informer().HasSynced
   return dc, nil
}
```

## ResourceEventHandler

上面提到了几个 ResourceEventHandler 回调函数：

- addDeployment
- updateDeployment
- deleteDeployment
- addReplicaSet
- updateReplicaSet
- deleteReplicaSet
- deletePod

逐个分析下。

### Deployment 变更事件

这里逻辑比较简单，三个方法一起看：

- **pkg/controller/deployment/deployment_controller.go:167**

```go
func (dc *DeploymentController) addDeployment(obj interface{}) {
   d := obj.(*apps.Deployment)
   klog.V(4).InfoS("Adding deployment", "deployment", klog.KObj(d))
   // 新增 Deployment 时直接 enqueue
   dc.enqueueDeployment(d)
}

func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
   oldD := old.(*apps.Deployment)
   curD := cur.(*apps.Deployment)
   klog.V(4).InfoS("Updating deployment", "deployment", klog.KObj(oldD))
   // old Deployment 只用来打印一个日志，cur Deployment enqueue
   dc.enqueueDeployment(curD)
}

func (dc *DeploymentController) deleteDeployment(obj interface{}) {
   d, ok := obj.(*apps.Deployment)
   if !ok {
      // 处理 DeletedFinalStateUnknown 场景
      tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
         return
      }
      d, ok = tombstone.Obj.(*apps.Deployment)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Deployment %#v", obj))
         return
      }
   }
   klog.V(4).InfoS("Deleting deployment", "deployment", klog.KObj(d))
   // 入队
   dc.enqueueDeployment(d)
}
```

### ReplicaSet 变更事件

然后来看 ReplicaSet 相关回调函数的实现。

1、**Added**

- **pkg/controller/deployment/deployment_controller.go:199**

```go
func (dc *DeploymentController) addReplicaSet(obj interface{}) {
   rs := obj.(*apps.ReplicaSet)
   // 如果是准备删除了，重启的过程会收到 Added 事件，这时候直接调用删除操作
   if rs.DeletionTimestamp != nil {
      dc.deleteReplicaSet(rs)
      return
   }
   // 查询对应的 Deployment
   if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
      d := dc.resolveControllerRef(rs.Namespace, controllerRef)
      if d == nil {
         return
      }
      klog.V(4).InfoS("ReplicaSet added", "replicaSet", klog.KObj(rs))
      // 将这个 Deployment 加入工作队列
      dc.enqueueDeployment(d)
      return
   }

   // 如果是一个孤儿 ReplicaSet，则看是不是能找到一个 Deployment 来领养
   ds := dc.getDeploymentsForReplicaSet(rs)
   if len(ds) == 0 {
      return
   }
   klog.V(4).InfoS("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs))
   // 一般只有一个 Deployment，但是也不能排出多个的情况，所以这里用的是 ds 列表，循环 enqueue
   for _, d := range ds {
      dc.enqueueDeployment(d)
   }
}
```

2、**Updated**

- **pkg/controller/deployment/deployment_controller.go:256**

```go
func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
   curRS := cur.(*apps.ReplicaSet)
   oldRS := old.(*apps.ReplicaSet)
   if curRS.ResourceVersion == oldRS.ResourceVersion {
      // Resync 的时候 RV 相同，不做处理
      return
   }

   curControllerRef := metav1.GetControllerOf(curRS)
   oldControllerRef := metav1.GetControllerOf(oldRS)
   controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
   if controllerRefChanged && oldControllerRef != nil {
      // 如果 rs 的 ref 变更了，就需要通知老的 ref 对应的 Deployment
      if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
         dc.enqueueDeployment(d)
      }
   }

   if curControllerRef != nil {
      d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
      if d == nil {
         return
      }
      klog.V(4).InfoS("ReplicaSet updated", "replicaSet", klog.KObj(curRS))
      // 当前 rs 对应 dp 入队
      dc.enqueueDeployment(d)
      return
   }

   // 孤儿 rs 的场景，和 Added 时处理逻辑一样
   labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
   if labelChanged || controllerRefChanged {
      ds := dc.getDeploymentsForReplicaSet(curRS)
      if len(ds) == 0 {
         return
      }
      klog.V(4).InfoS("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS))
      for _, d := range ds {
         dc.enqueueDeployment(d)
      }
   }
}
```

3、**Deleted**

- **pkg/controller/deployment/deployment_controller.go:304**

```go
func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
   rs, ok := obj.(*apps.ReplicaSet)

   // 删除场景需要处理的 DeletedFinalStateUnknown 场景
   if !ok {
      tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
         return
      }
      rs, ok = tombstone.Obj.(*apps.ReplicaSet)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
         return
      }
   }

   // 孤儿 rs 被删除时没有 Deployment 需要关心
   controllerRef := metav1.GetControllerOf(rs)
   if controllerRef == nil {
      return
   }
   d := dc.resolveControllerRef(rs.Namespace, controllerRef)
   if d == nil {
      return
   }
   klog.V(4).InfoS("ReplicaSet deleted", "replicaSet", klog.KObj(rs))
   // 入队
   dc.enqueueDeployment(d)
}
```





## DeploymentController 启动

前面看了哪些 Event 会向 workqueue 中添加 item，接着看下这些 item 是怎么被消费的。

### Run()

`Run()` 方法本身很简洁，根据给定的并发数，也就是默认 5 并发，启动 dc.worker

- **pkg/controller/deployment/deployment_controller.go:149**

```go
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   defer dc.queue.ShutDown()

   klog.InfoS("Starting controller", "controller", "deployment")
   defer klog.InfoS("Shutting down controller", "controller", "deployment")

   if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
      return
   }

   for i := 0; i < workers; i++ {
      go wait.Until(dc.worker, time.Second, stopCh)
   }

   <-stopCh
}
```

继续看 worker 的内容

- **pkg/controller/deployment/deployment_controller.go:460**

```go
func (dc *DeploymentController) worker() {
   for dc.processNextWorkItem() {
   }
}

func (dc *DeploymentController) processNextWorkItem() bool {
   key, quit := dc.queue.Get() // 从 workqueue 中获取一个 item
   if quit {
      return false
   }
   defer dc.queue.Done(key)
   // 主要逻辑
   err := dc.syncHandler(key.(string))
   dc.handleErr(err, key)

   return true
}
```

这里从 workqueue 里拿到一个 key 之后，通过调用 `syncHandler()` 方法来处理，前面强调过这行代码：

- **dc.syncHandler = dc.syncDeployment**

所以接着我们继续跟 dc.syncDeployment 的实现。

### syncDeployment

`syncDeployment()` 方法做的事情是拿着 workqueue 里出队的 key，根据这个 key 来 sync 对应的 Deployment，继续看下具体的逻辑。

- **pkg/controller/deployment/deployment_controller.go**

```go
func (dc *DeploymentController) syncDeployment(key string) error {
   // 从 key 中分割出 namespace 和 name
   namespace, name, err := cache.SplitMetaNamespaceKey(key)
   if err != nil {
      klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
      return err
   }

   startTime := time.Now()
   klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
   defer func() {
      klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
   }()
   // 根据 namespace 和 name 从 cache 中检索对应 Deployment 对象
   deployment, err := dc.dLister.Deployments(namespace).Get(name)
   if errors.IsNotFound(err) {
      klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
      return nil
   }
   if err != nil {
      return err
   }

   // 为了不改动这个 cache，这是一个 ThreadSafeStore
   d := deployment.DeepCopy()

   // 空 LabelSelector 会匹配到所有 pods，发一个 Warning Event，更新 .Status.ObservedGeneration 然后返回
   everything := metav1.LabelSelector{}
   if reflect.DeepEqual(d.Spec.Selector, &everything) {
      dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
      if d.Status.ObservedGeneration < d.Generation {
         d.Status.ObservedGeneration = d.Generation
         dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
      }
      return nil
   }

   // 获取当前 Deployment 拥有的所有 ReplicaSet，同时会更新这些 ReplicaSet 的 ControllerRef
   rsList, err := dc.getReplicaSetsForDeployment(d)
   if err != nil {
      return err
   }
   // 这个 map 是 map[types.UID][]*v1.Pod 类型，key 是 rs 的 UID，value 是对应 rs 管理的所有 pod 列表
   podMap, err := dc.getPodMapForDeployment(d, rsList)
   if err != nil {
      return err
   }

   // 已经标记要删除了，这时候只更新状态
   if d.DeletionTimestamp != nil {
      return dc.syncStatusOnly(d, rsList)
   }

   // 根据 .Spec.Pause配置看是否更新 Deployment 的 conditions
   if err = dc.checkPausedConditions(d); err != nil {
      return err
   }

   if d.Spec.Paused {
      // Pause 或 scale 时的调谐逻辑
      return dc.sync(d, rsList)
   }

   // 应该过期了，老版本的 "deprecated.deployment.rollback.to" 注解回滚逻辑
   if getRollbackTo(d) != nil {
      // 回滚到旧版本的逻辑
      return dc.rollback(d, rsList)
   }
   // 如果是 scale
   scalingEvent, err := dc.isScalingEvent(d, rsList)
   if err != nil {
      return err
   }
   if scalingEvent {
      // Pause 或 scale 时的调谐逻辑
      return dc.sync(d, rsList)
   }

   switch d.Spec.Strategy.Type {
   // 重建策略
   case apps.RecreateDeploymentStrategyType:
      return dc.rolloutRecreate(d, rsList, podMap)
   // 滚动更新策略
   case apps.RollingUpdateDeploymentStrategyType:
      return dc.rolloutRolling(d, rsList)
   }
   return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
```

## 小结

`syncDeployment()` 方法看完之后，Deployment 控制器的逻辑就算过完一遍了。当然这个方法内部涉及到的一些小方法的调用这里只是简单介绍其功能，并没有深究所有实现细节，不过这些小方法的逻辑都不难，就暂不赘述了。

对 Deployment Controller 的代码分析在 client-go 和 Job Controller 之后，所以这里看起来感觉会很简单，对应有些描述也没有那么详细，如果大家看过前面我发的相关文章，看本文的逻辑应该也不会感觉吃力，反正我希望你先回过头看下我之前发的相关文章，最新版可以在我的博客网站 [Daniel Hu's Blog](https://www.danielhu.cn/) 查阅。

