概述

源码版本:kubernetes master 分支 commit-fe62fc(2021年10月14日)

Job 是主要的 Kubernetes 原生 Workload 资源之一,是在 Kubernetes 之上运行批处理任务最简单的方式,在 AI 模型训练等场景下最基础的实现版本就是拉起一个 Job 来完成一次训练任务,然后才是各种自定义 “Job” 实现进阶处理,比如分布式训练需要一个 “Job” 同时拉起多个 Pod,但是每个 Pod 的启动参数会有差异。所以深入理解 Job 的功能和实现细节是进一步开发自定义 “Job” 类型工作负载的基础。

我们在《Kubernetes Job Controller 原理和源码分析(一)》中详细介绍了 Job 的特性,今天我们继续从源码角度剖析下 Job 的实现。

注意:阅读 Job 源码需要有一定的自定义控制器工作原理基础,里面涉及到了 Informer 工作机制、workqueue(延时/限速工作队列)、ResourceEventHandler 等等逻辑,没有相关知识储备直接看本文会有一定挑战,建议先阅读《深入理解 K8S 原理与实现》系列目录里列的相关文章。


《Kubernetes Job Controller 原理和源码分析》分为三讲:

程序入口

Job 控制器代码入口在 pkg/controller/job 包的 job_controller.go 源文件里。在这个源文件里可以看到一个 NewController() 函数,用于新建一个 Job controller,这个 controller 也就是用来调谐 job 对象和其对应的 pods 的控制器。另外 Controller 对象有一个 Run() 方法,用于启动这个控制器的主流程,开始 watch 和 sync 所有的 job 对象。

这里的组织方式还是很清晰,一个 NewXxx() 函数配合一个 Run() 方法,完成一个对象的初始化和启动流程。如果向上再跟一级 Run() 方法的调用入口,我们还可以看到 cmd 里有这样一段代码:

  • cmd/kube-controller-manager/app/batch.go:34
1
2
3
4
5
6
7
8
func startJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
	go job.NewController(
		controllerContext.InformerFactory.Core().V1().Pods(),
		controllerContext.InformerFactory.Batch().V1().Jobs(),
		controllerContext.ClientBuilder.ClientOrDie("job-controller"),
	).Run(int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Done())
	return nil, true, nil
}

这里的逻辑是调用 job.NewController() 方法获取一个 Job controller 然后调用其 Run() 方法来完成控制器启动逻辑,前者的三个参数分别是 podInformer、jobInformer 和 kubeClient,后者的参数主要是并发数,也就是同时开启几个调谐 loop。

Job controller 的创建

Controller 对象

回到 NewController() 函数,我们看下里面的主要逻辑。函数声明是这样的:

1
func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller

参数有三个:

  • podInformer coreinformers.PodInformer
  • jobInformer batchinformers.JobInformer
  • kubeClient clientset.Interface

这里的 PodInformer 和 JobInformer 引用在初始化 Job controller 的时候用来设置 EventHandlerFunc,另外设置 Job controller 的 jobLister 和 podStore 属性;kubeClient 是 clientset.Interface 类型,也就是可以用来分别 Get Job 和 Pod,后面会讲到。

返回值是 *Controller 类型,至于为什么不叫做 JobController,咱也不知道,咱也不敢问,反正 DeploymentController、ReplicaSetController 等命名都是带上类型的,这里多多少少给人感觉不清晰。不同开发者的风格吧。先看一下类型定义:

  • pkg/controller/job/job_controller.go:80
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type Controller struct {
   kubeClient clientset.Interface
   podControl controller.PodControlInterface
   updateStatusHandler func(job *batch.Job) error
   patchJobHandler     func(job *batch.Job, patch []byte) error
   syncHandler         func(jobKey string) (bool, error)
   podStoreSynced cache.InformerSynced // pod 存储是否至少更新过一次
   jobStoreSynced cache.InformerSynced // job 存储是否至少更新过一次
   expectations controller.ControllerExpectationsInterface
   jobLister batchv1listers.JobLister // jobs 存储
   podStore corelisters.PodLister // pods 存储
   queue workqueue.RateLimitingInterface // 需要更新的 Job 队列,限速队列实现
   orphanQueue workqueue.RateLimitingInterface // 孤儿 pods 队列,用来给 Job 追踪 finalizer 执行删除
   recorder record.EventRecorder // 用于发送 Event
}

后面在具体的函数中看下上面的属性都有些啥作用。

NewController()

这里有一堆的方法调用,我们先看下整体流程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
   // event 初始化,我们在《Kubernetes Event 原理和源码分析》中详细介绍过 event 的逻辑
   eventBroadcaster := record.NewBroadcaster()
   eventBroadcaster.StartStructuredLogging(0)
   eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

   // ……

   // 初始化 Controller,jm 应该是 job manager 的意思
   jm := &Controller{
      kubeClient: kubeClient,
      podControl: controller.RealPodControl{
         KubeClient: kubeClient,
         Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
      },
      expectations: controller.NewControllerExpectations(),
      // 限速队列我们在《Kubernetes client-go 源码分析 - workqueue》中详细介绍过
      queue:        workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
      orphanQueue:  workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"),
      recorder:     eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
   }

   // 配置 jobInformer 的 ResourceEventHandler
   jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc: func(obj interface{}) {
         jm.enqueueController(obj, true) // 增
      },
      UpdateFunc: jm.updateJob, // 改
      DeleteFunc: func(obj interface{}) {
         jm.enqueueController(obj, true) // 删
      },
   })
   jm.jobLister = jobInformer.Lister()
   jm.jobStoreSynced = jobInformer.Informer().HasSynced

   // 配置 jobInformer 的 ResourceEventHandler
   podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc:    jm.addPod,
      UpdateFunc: jm.updatePod,
      DeleteFunc: jm.deletePod,
   })
   jm.podStore = podInformer.Lister()
   jm.podStoreSynced = podInformer.Informer().HasSynced

   jm.updateStatusHandler = jm.updateJobStatus
   jm.patchJobHandler = jm.patchJob
   jm.syncHandler = jm.syncJob

   metrics.Register()

   return jm
}

这里有可以看到 EventHandler 的逻辑入口,下面会具体分析,我们先看 New 函数里的 podControl,看下具体定义:

podControl

这个属性的定义如下

1
podControl controller.PodControlInterface

PodControlInterface 是一个接口类型,定义如下:

1
2
3
4
5
6
type PodControlInterface interface {
   CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
   CreatePodsWithGenerateName(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error
   DeletePod(namespace string, podID string, object runtime.Object) error
   PatchPod(namespace, name string, data []byte) error
}

这个接口就是用来创建、删除 pod 的。前面实例化的时候传递的是:

1
2
3
4
controller.RealPodControl{
   KubeClient: kubeClient,
   Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
},

RealPodControl 是一个结构体,具体实现了上述 PodControlInterface 接口。

EventHandler

Job AddFunc DeleteFunc

前面我们看到这段代码:

1
2
3
4
5
6
7
8
9
	jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			jm.enqueueController(obj, true)
		},
		UpdateFunc: jm.updateJob,
		DeleteFunc: func(obj interface{}) {
			jm.enqueueController(obj, true)
		},
	})

可以看到在 Job 类型资源对象实例新增和删除的时候,都是执行的 jm.enqueueController(obj, true),我们继续看这个方法的逻辑:

  • pkg/controller/job/job_controller.go:417
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (jm *Controller) enqueueController(obj interface{}, immediate bool) {
   // 计算 key
   key, err := controller.KeyFunc(obj)
   if err != nil {
      utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
      return
   }
   // 如果指定了 immediate 就马上重试
   backoff := time.Duration(0)
   if !immediate {
      // 这里的重试间隔是用的 10s 乘以 2 的 n-1 次方,n 是 requeue 次数,也就是这个对象 requeue 到 workqueue 的次数
      backoff = getBackoff(jm.queue, key)
   }

   klog.Infof("enqueueing job %s", key)
   // 加入到 workqueue,这是一个限速队列
   jm.queue.AddAfter(key, backoff)
}

Job UpdateFunc

继续看 Job 类型资源对象实例更新到时候代码逻辑,这里相比 AddFunc 主要多了一个 ActiveDeadlineSeconds 的处理逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (jm *Controller) updateJob(old, cur interface{}) {
   oldJob := old.(*batch.Job)
   curJob := cur.(*batch.Job)

   key, err := controller.KeyFunc(curJob)
   if err != nil {
      return
   }
   // 这里和 AddFunc 逻辑一样,新对象加入到 workqueue
   jm.enqueueController(curJob, true)
   // check if need to add a new rsync for ActiveDeadlineSeconds
   if curJob.Status.StartTime != nil {
      curADS := curJob.Spec.ActiveDeadlineSeconds
      // 检查新对象是否配置 ActiveDeadlineSeconds,没有则直接返回
      if curADS == nil {
         return
      }
      // 到这里说明新对象配置了 ActiveDeadlineSeconds
      oldADS := oldJob.Spec.ActiveDeadlineSeconds
      // 下面逻辑是计算 ActiveDeadlineSeconds 配置的时间 - 当前已经使用的时间 得到一个剩余到期时间,然后将这个延时写入延时队列,从而实现在 ActiveDeadlineSeconds 到期时调谐流程可以被触发
      if oldADS == nil || *oldADS != *curADS {
         now := metav1.Now()
         start := curJob.Status.StartTime.Time
         passed := now.Time.Sub(start)
         total := time.Duration(*curADS) * time.Second
         // AddAfter will handle total < passed
         jm.queue.AddAfter(key, total-passed)
         klog.V(4).Infof("job %q ActiveDeadlineSeconds updated, will rsync after %d seconds", key, total-passed)
      }
   }
}

Pod AddFunc

继续看 Pod 新增时对应到操作,我们在前面看到过这几行代码:

1
2
3
4
5
6
7
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    jm.addPod,
		UpdateFunc: jm.updatePod,
		DeleteFunc: func(obj interface{}) {
			jm.deletePod(obj, true)
		},
	})

这里可以看到 Pod 增删改时分别对应的代码处理入口,先来看 add 动作:

  • pkg/controller/job/job_controller.go:232
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (jm *Controller) addPod(obj interface{}) {
   pod := obj.(*v1.Pod)
   if pod.DeletionTimestamp != nil {
      // 当控制器重启的时候可以会收到一个处于删除状态的 Pod 新增事件
      jm.deletePod(pod, false)
      return
   }

   if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
      // 查询这个 pod 归哪个 job 管
      job := jm.resolveControllerRef(pod.Namespace, controllerRef)
      if job == nil {
         return
      }
      jobKey, err := controller.KeyFunc(job)
      if err != nil {
         return
      }
      // 记录动作
      jm.expectations.CreationObserved(jobKey)
      // 将发生 pod 新增事件对应的 job 加入到 workqueue
      jm.enqueueController(job, true)
      return
   }

   // 如果没有 controllerRef 配置,那么这是一个 孤儿 pod,这时候还是通知对应的 job 来认领这个 pod
   for _, job := range jm.getPodJobs(pod) {
      jm.enqueueController(job, true)
   }
}

Pod UpdateFunc

继续来看 Pod 对象的更新动作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func (jm *Controller) updatePod(old, cur interface{}) {
   curPod := cur.(*v1.Pod)
   oldPod := old.(*v1.Pod)
   if curPod.ResourceVersion == oldPod.ResourceVersion {
      // 周期性的 resync 会触发 Update 事件,通过比较 RV 可以过滤一些不必要的操作
      return
   }
   if curPod.DeletionTimestamp != nil {
      // 如果是正在被删除的 Pod
      jm.deletePod(curPod, false)
      return
   }

   // 如果 Pod 已经跪了,immediate 就为 false
   immediate := curPod.Status.Phase != v1.PodFailed

   // 新 pod 是否没有 finalizer 配置
   finalizerRemoved := !hasJobTrackingFinalizer(curPod)
   curControllerRef := metav1.GetControllerOf(curPod)
   oldControllerRef := metav1.GetControllerOf(oldPod)
   // 判断新旧 Pod 是否属于同一个控制器管理
   controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
   if controllerRefChanged && oldControllerRef != nil {
      // 如果 ControllerRef 已经变了,这时候需要通知旧的控制器
      if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
         if finalizerRemoved {
            key, err := controller.KeyFunc(job)
            if err == nil {
               jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID))
            }
         }
         jm.enqueueController(job, immediate)
      }
   }

   // 如果 ControllerRef 一致
   if curControllerRef != nil {
      // 提取这个 pod 对应的所属 job
      job := jm.resolveControllerRef(curPod.Namespace, curControllerRef)
      if job == nil {
         return
      }
      if finalizerRemoved {
         key, err := controller.KeyFunc(job)
         if err == nil {
            jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID))
         }
      }
      // 加入到 workqueue
      jm.enqueueController(job, immediate)
      return
   }

   // 代码如果执行到这里,说明这是一个孤儿 pod,这时候尝试寻找一个能够认领的 job,如果有则通知其认领
   labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
   if labelChanged || controllerRefChanged {
      for _, job := range jm.getPodJobs(curPod) {
         jm.enqueueController(job, immediate)
      }
   }
}

Pod DeleteFunc

最后看下 Pod 删除时的操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (jm *Controller) deletePod(obj interface{}, final bool) {
   pod, ok := obj.(*v1.Pod)

   // 处理 DeletedFinalStateUnknown 场景
   if !ok {
      tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
         return
      }
      pod, ok = tombstone.Obj.(*v1.Pod)
      if !ok {
         utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
         return
      }
   }
   // 和前面类似,查询这个 pod 归哪个 job 管
   controllerRef := metav1.GetControllerOf(pod)
   if controllerRef == nil {
      // No controller should care about orphans being deleted.
      return
   }
   job := jm.resolveControllerRef(pod.Namespace, controllerRef)
   if job == nil {
      if hasJobTrackingFinalizer(pod) {
         jm.enqueueOrphanPod(pod)
      }
      return
   }
   jobKey, err := controller.KeyFunc(job)
   if err != nil {
      return
   }
   jm.expectations.DeletionObserved(jobKey)

   if final || !hasJobTrackingFinalizer(pod) {
      jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID))
   }
   // 加入 workqueue
   jm.enqueueController(job, true)
}

(转载请保留本文原始链接 https://www.danielhu.cn