# Kubernetes Job Controller 原理和源码分析（二）


## 概述

> 源码版本：kubernetes master 分支 commit-fe62fc（2021年10月14日）

Job 是主要的 Kubernetes 原生 Workload 资源之一，是在 Kubernetes 之上运行批处理任务最简单的方式，在 AI 模型训练等场景下最基础的实现版本就是拉起一个 Job 来完成一次训练任务，然后才是各种自定义 “Job” 实现进阶处理，比如分布式训练需要一个 “Job” 同时拉起多个 Pod，但是每个 Pod 的启动参数会有差异。所以深入理解 Job 的功能和实现细节是进一步开发自定义 “Job” 类型工作负载的基础。

我们在[《Kubernetes Job Controller 原理和源码分析（一）》](../job-controller-1/)中详细介绍了 Job 的特性，今天我们继续从源码角度剖析下 Job 的实现。

---

《Kubernetes Job Controller 原理和源码分析》分为三讲：

- [《Kubernetes Job Controller 原理和源码分析（一）》](../k8s-job-controller-1/) - 详细介绍 Job 的用法和支持的特性
- [《Kubernetes Job Controller 原理和源码分析（二）》](../k8s-job-controller-2/) - 源码分析第一部分，从控制器入口一直到所有 EventHandler 的具体实现，也就是“调谐任务”进入 workqueue 之前的全部逻辑
- [《Kubernetes Job Controller 原理和源码分析（三）》](../k8s-job-controller-3/) - 源码分析第二部分，从 workqueue 消费“调谐任务”，具体的调谐过程实现等代码逻辑

## 程序入口

**Job** 控制器代码入口在 *pkg/controller/job* 包的 *job_controller.go* 源文件里。在这个源文件里可以看到一个 `NewController()` 函数，用于新建一个 Job controller，这个 controller 也就是用来调谐 job 对象和其对应的 pods 的控制器。另外 **Controller** 对象有一个 `Run()` 方法，用于启动这个控制器的主流程，开始 watch 和 sync 所有的 job 对象。

这里的组织方式还是很清晰，一个 NewXxx() 函数配合一个 Run() 方法，完成一个对象的初始化和启动流程。如果向上再跟一级 `Run()` 方法的调用入口，我们还可以看到 cmd 里有这样一段代码：

- **cmd/kube-controller-manager/app/batch.go:34**

```go
func startJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
	go job.NewController(
		controllerContext.InformerFactory.Core().V1().Pods(),
		controllerContext.InformerFactory.Batch().V1().Jobs(),
		controllerContext.ClientBuilder.ClientOrDie("job-controller"),
	).Run(int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Done())
	return nil, true, nil
}
```

这里的逻辑是调用 `job.NewController()` 方法获取一个 Job controller 然后调用其 `Run()` 方法来完成控制器启动逻辑，前者的三个参数分别是 podInformer、jobInformer 和 kubeClient，后者的参数主要是并发数，也就是同时开启几个调谐 loop。

## Job controller 的创建

### Controller 对象

回到 `NewController()` 函数，我们看下里面的主要逻辑。函数声明是这样的：

```go
func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller
```

参数有三个：

- podInformer coreinformers.PodInformer
- jobInformer batchinformers.JobInformer
- kubeClient clientset.Interface

这里的 PodInformer 和 JobInformer 引用在初始化 Job controller 的时候用来设置 EventHandlerFunc，另外设置 Job controller 的 jobLister 和 podStore 属性；kubeClient 是 clientset.Interface 类型，也就是可以用来分别 Get Job 和 Pod，后面会讲到。

返回值是 `*Controller` 类型，至于为什么不叫做 JobController，咱也不知道，咱也不敢问，反正 DeploymentController、ReplicaSetController 等命名都是带上类型的，这里多多少少给人感觉不清晰。不同开发者的风格吧。先看一下类型定义：

- **pkg/controller/job/job_controller.go:80**

```go
type Controller struct {
   kubeClient clientset.Interface
   podControl controller.PodControlInterface
   updateStatusHandler func(job *batch.Job) error
   patchJobHandler     func(job *batch.Job, patch []byte) error
   syncHandler         func(jobKey string) (bool, error)
   podStoreSynced cache.InformerSynced // pod 存储是否至少更新过一次
   jobStoreSynced cache.InformerSynced // job 存储是否至少更新过一次
   expectations controller.ControllerExpectationsInterface
   jobLister batchv1listers.JobLister // jobs 存储
   podStore corelisters.PodLister // pods 存储
   queue workqueue.RateLimitingInterface // 需要更新的 Job 队列，限速队列实现
   orphanQueue workqueue.RateLimitingInterface // 孤儿 pods 队列，用来给 Job 追踪 finalizer 执行删除
   recorder record.EventRecorder // 用于发送 Event
}
```

 后面在具体的函数中看下上面的属性都有些啥作用。

### NewController()

这里有一堆的方法调用，我们先看下整体流程：

```go
func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
   // event 初始化，我们在《Kubernetes Event 原理和源码分析》中详细介绍过 event 的逻辑
   eventBroadcaster := record.NewBroadcaster()
   eventBroadcaster.StartStructuredLogging(0)
   eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

   // ……

   // 初始化 Controller，jm 应该是 job manager 的意思
   jm := &Controller{
      kubeClient: kubeClient,
      podControl: controller.RealPodControl{
         KubeClient: kubeClient,
         Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
      },
      expectations: controller.NewControllerExpectations(),
      // 限速队列我们在《Kubernetes client-go 源码分析 - workqueue》中详细介绍过
      queue:        workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
      orphanQueue:  workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"),
      recorder:     eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
   }

   // 配置 jobInformer 的 ResourceEventHandler
   jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc: func(obj interface{}) {
         jm.enqueueController(obj, true) // 增
      },
      UpdateFunc: jm.updateJob, // 改
      DeleteFunc: func(obj interface{}) {
         jm.enqueueController(obj, true) // 删
      },
   })
   jm.jobLister = jobInformer.Lister()
   jm.jobStoreSynced = jobInformer.Informer().HasSynced

   // 配置 jobInformer 的 ResourceEventHandler
   podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc:    jm.addPod,
      UpdateFunc: jm.updatePod,
      DeleteFunc: jm.deletePod,
   })
   jm.podStore = podInformer.Lister()
   jm.podStoreSynced = podInformer.Informer().HasSynced

   jm.updateStatusHandler = jm.updateJobStatus
   jm.patchJobHandler = jm.patchJob
   jm.syncHandler = jm.syncJob

   metrics.Register()

   return jm
}
```

这里有可以看到 EventHandler 的逻辑入口，下面会具体分析，我们先看 New 函数里的 podControl，看下具体定义：

#### podControl

这个属性的定义如下

```go
podControl controller.PodControlInterface
```

PodControlInterface 是一个接口类型，定义如下：

```go
type PodControlInterface interface {
   CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
   CreatePodsWithGenerateName(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error
   DeletePod(namespace string, podID string, object runtime.Object) error
   PatchPod(namespace, name string, data []byte) error
}
```

这个接口就是用来创建、删除 pod 的。前面实例化的时候传递的是：

```go
controller.RealPodControl{
   KubeClient: kubeClient,
   Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
},
```

RealPodControl 是一个结构体，具体实现了上述 PodControlInterface 接口。

## EventHandler

### Job AddFunc DeleteFunc

前面我们看到这段代码：

```go
	jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			jm.enqueueController(obj, true)
		},
		UpdateFunc: jm.updateJob,
		DeleteFunc: func(obj interface{}) {
			jm.enqueueController(obj, true)
		},
	})
```

可以看到在 Job 类型资源对象实例新增和删除的时候，都是执行的 `jm.enqueueController(obj, true)`，我们继续看这个方法的逻辑：

- **pkg/controller/job/job_controller.go:417**

```go
func (jm *Controller) enqueueController(obj interface{}, immediate bool) {
   // 计算 key
   key, err := controller.KeyFunc(obj)
   if err != nil {
      utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
      return
   }
   // 如果指定了 immediate 就马上重试
   backoff := time.Duration(0)
   if !immediate {
      // 这里的重试间隔是用的 10s 乘以 2 的 n-1 次方，n 是 requeue 次数，也就是这个对象 requeue 到 workqueue 的次数
      backoff = getBackoff(jm.queue, key)
   }

   klog.Infof("enqueueing job %s", key)
   // 加入到 workqueue，这是一个限速队列
   jm.queue.AddAfter(key, backoff)
}
```

### Job UpdateFunc

继续看 Job 类型资源对象实例更新到时候代码逻辑，这里相比 AddFunc 主要多了一个 ActiveDeadlineSeconds 的处理逻辑：

```go
func (jm *Controller) updateJob(old, cur interface{}) {
   oldJob := old.(*batch.Job)
   curJob := cur.(*batch.Job)

   key, err := controller.KeyFunc(curJob)
   if err != nil {
      return
   }
   // 这里和 AddFunc 逻辑一样，新对象加入到 workqueue
   jm.enqueueController(curJob, true)
   // check if need to add a new rsync for ActiveDeadlineSeconds
   if curJob.Status.StartTime != nil {
      curADS := curJob.Spec.ActiveDeadlineSeconds
      // 检查新对象是否配置 ActiveDeadlineSeconds，没有则直接返回
      if curADS == nil {
         return
      }
      // 到这里说明新对象配置了 ActiveDeadlineSeconds
      oldADS := oldJob.Spec.ActiveDeadlineSeconds
      // 下面逻辑是计算 ActiveDeadlineSeconds 配置的时间 - 当前已经使用的时间 得到一个剩余到期时间，然后将这个延时写入延时队列，从而实现在 ActiveDeadlineSeconds 到期时调谐流程可以被触发
      if oldADS == nil || *oldADS != *curADS {
         now := metav1.Now()
         start := curJob.Status.StartTime.Time
         passed := now.Time.Sub(start)
         total := time.Duration(*curADS) * time.Second
         // AddAfter will handle total < passed
         jm.queue.AddAfter(key, total-passed)
         klog.V(4).Infof("job %q ActiveDeadlineSeconds updated, will rsync after %d seconds", key, total-passed)
      }
   }
}
```

### Pod AddFunc

继续看 Pod 新增时对应到操作，我们在前面看到过这几行代码：

```go
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    jm.addPod,
		UpdateFunc: jm.updatePod,
		DeleteFunc: func(obj interface{}) {
			jm.deletePod(obj, true)
		},
	})
```

这里可以看到 Pod 增删改时分别对应的代码处理入口，先来看 add 动作：

- **pkg/controller/job/job_controller.go:232**

```go
func (jm *Controller) addPod(obj interface{}) {
   pod := obj.(*v1.Pod)
   if pod.DeletionTimestamp != nil {
      // 当控制器重启的时候可以会收到一个处于删除状态的 Pod 新增事件
      jm.deletePod(pod, false)
      return
   }

   if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
      // 查询这个 pod 归哪个 job 管
      job := jm.resolveControllerRef(pod.Namespace, controllerRef)
      if job == nil {
         return
      }
      jobKey, err := controller.KeyFunc(job)
      if err != nil {
         return
      }
      // 记录动作
      jm.expectations.CreationObserved(jobKey)
      // 将发生 pod 新增事件对应的 job 加入到 workqueue
      jm.enqueueController(job, true)
      return
   }

   // 如果没有 controllerRef 配置，那么这是一个 孤儿 pod，这时候还是通知对应的 job 来认领这个 pod
   for _, job := range jm.getPodJobs(pod) {
      jm.enqueueController(job, true)
   }
}
```

### Pod UpdateFunc

继续来看 Pod 对象的更新动作：

```go
func (jm *Controller) updatePod(old, cur interface{}) {
   curPod := cur.(*v1.Pod)
   oldPod := old.(*v1.Pod)
   if curPod.ResourceVersion == oldPod.ResourceVersion {
      // 周期性的 resync 会触发 Update 事件，通过比较 RV 可以过滤一些不必要的操作
      return
   }
   if curPod.DeletionTimestamp != nil {
      // 如果是正在被删除的 Pod
      jm.deletePod(curPod, false)
      return
   }

   // 如果 Pod 已经跪了，immediate 就为 false
   immediate := curPod.Status.Phase != v1.PodFailed

   // 新 pod 是否没有 finalizer 配置
   finalizerRemoved := !hasJobTrackingFinalizer(curPod)
   curControllerRef := metav1.GetControllerOf(curPod)
   oldControllerRef := metav1.GetControllerOf(oldPod)
   // 判断新旧 Pod 是否属于同一个控制器管理
   controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
   if controllerRefChanged && oldControllerRef != nil {
      // 如果 ControllerRef 已经变了，这时候需要通知旧的控制器
      if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
         if finalizerRemoved {
            key, err := controller.KeyFunc(job)
            if err == nil {
               jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID))
            }
         }
         jm.enqueueController(job, immediate)
      }
   }

   // 如果 ControllerRef 一致
   if curControllerRef != nil {
      // 提取这个 pod 对应的所属 job
      job := jm.resolveControllerRef(curPod.Namespace, curControllerRef)
      if job == nil {
         return
      }
      if finalizerRemoved {
         key, err := controller.KeyFunc(job)
         if err == nil {
            jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID))
         }
      }
      // 加入到 workqueue
      jm.enqueueController(job, immediate)
      return
   }

   // 代码如果执行到这里，说明这是一个孤儿 pod，这时候尝试寻找一个能够认领的 job，如果有则通知其认领
   labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
   if labelChanged || controllerRefChanged {
      for _, job := range jm.getPodJobs(curPod) {
         jm.enqueueController(job, immediate)
      }
   }
}
```

### Pod DeleteFunc

最后看下 Pod 删除时的操作

```go
func (jm *Controller) deletePod(obj interface{}, final bool) {
   pod, ok := obj.(*v1.Pod)

   // 处理 DeletedFinalStateUnknown 场景
   if !ok {
      tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
         return
      }
      pod, ok = tombstone.Obj.(*v1.Pod)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
         return
      }
   }
   // 和前面类似，查询这个 pod 归哪个 job 管
   controllerRef := metav1.GetControllerOf(pod)
   if controllerRef == nil {
      // No controller should care about orphans being deleted.
      return
   }
   job := jm.resolveControllerRef(pod.Namespace, controllerRef)
   if job == nil {
      if hasJobTrackingFinalizer(pod) {
         jm.enqueueOrphanPod(pod)
      }
      return
   }
   jobKey, err := controller.KeyFunc(job)
   if err != nil {
      return
   }
   jm.expectations.DeletionObserved(jobKey)

   if final || !hasJobTrackingFinalizer(pod) {
      jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID))
   }
   // 加入 workqueue
   jm.enqueueController(job, true)
}
```

