目录

Kubernetes Job Controller 原理和源码分析(三)

概述

源码版本:kubernetes master 分支 commit-fe62fc(2021年10月14日)

Job 是主要的 Kubernetes 原生 Workload 资源之一,是在 Kubernetes 之上运行批处理任务最简单的方式,在 AI 模型训练等场景下最基础的实现版本就是拉起一个 Job 来完成一次训练任务,然后才是各种自定义 “Job” 实现进阶处理,比如分布式训练需要一个 “Job” 同时拉起多个 Pod,但是每个 Pod 的启动参数会有差异。所以深入理解 Job 的功能和实现细节是进一步开发自定义 “Job” 类型工作负载的基础。

我们在《Kubernetes Job Controller 原理和源码分析(一)》中详细介绍了 Job 的特性,在《Kubernetes Job Controller 原理和源码分析(二)》 中一路从 Job 控制器源码入口跟到所有 EventHandler 的实现,今天我们继续从 workqueue 的另外一端看下任务出队后的主要调谐逻辑实现。

注意:阅读 Job 源码需要有一定的自定义控制器工作原理基础,里面涉及到了 Informer 工作机制、workqueue(延时工作队列)、ResourceEventHandler 等等逻辑,没有相关知识储备直接看本文会有一定挑战,建议先阅读《深入理解 K8S 原理与实现》系列目录里列的相关文章。


《Kubernetes Job Controller 原理和源码分析》分为三讲:

Job controller 的启动

继续来看 Run() 方法

  • pkg/controller/job/job_controller.go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (jm *Controller) Run(workers int, stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   defer jm.queue.ShutDown()
   defer jm.orphanQueue.ShutDown()

   klog.Infof("Starting job controller")
   defer klog.Infof("Shutting down job controller")

   if !cache.WaitForNamedCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
      return
   }

   for i := 0; i < workers; i++ {
      go wait.Until(jm.worker, time.Second, stopCh)
   }

   go wait.Until(jm.orphanWorker, time.Second, stopCh)

   <-stopCh
}

可以看到这里的逻辑在两个 worker 里,继续看 jm.workerjm.orphanWorker 是什么逻辑

1
2
3
4
func (jm *Controller) worker() {
   for jm.processNextWorkItem() {
   }
}

worker() 方法里简单调用了 processNextWorkItem() 方法,另外一个 orphanWorker() 也是类似逻辑

1
2
3
4
func (jm *Controller) orphanWorker() {
   for jm.processNextOrphanPod() {
   }
}

继续看

processNextWorkItem()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (jm *Controller) processNextWorkItem() bool {
   key, quit := jm.queue.Get()
   if quit {
      return false
   }
   defer jm.queue.Done(key)

   forget, err := jm.syncHandler(key.(string)) // 核心逻辑
   if err == nil {
      if forget {
         jm.queue.Forget(key)
      }
      return true
   }

   utilruntime.HandleError(fmt.Errorf("syncing job: %w", err))
   if !apierrors.IsConflict(err) {
      jm.queue.AddRateLimited(key)
   }

   return true
}

这里的核心逻辑只有一行,就是 jm.syncHandler(key.(string)) 调用。我们往里跟这个 sincHandler 方法可以看到其实现是 func (jm *Controller) syncJob(key string) (forget bool, rErr error) 方法,这个方法里就是主要的调谐逻辑了。看下具体逻辑:

核心调谐逻辑入口 - syncJob()

从 workqueue 出来任务之后主要的业务逻辑从这里开始,这个方法很长……

  • pkg/controller/job/job_controller.go:588
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
   startTime := time.Now()
   defer func() {
      klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
   }()
	 // key 的结构一般是 namespace/name 的格式
   ns, name, err := cache.SplitMetaNamespaceKey(key)
   if err != nil {
      return false, err
   }
   if len(ns) == 0 || len(name) == 0 {
      return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
   }
   // 这里叫 sharedJob 是因为取的是本地 cache 的 job,通过 Indexer 提供的能力
   sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
   if err != nil {
      // 如果找不到,说明被其他 goroutine 删掉了,忽略
      if apierrors.IsNotFound(err) {
         klog.V(4).Infof("Job has been deleted: %v", key)
         jm.expectations.DeleteExpectations(key)
         jm.finalizerExpectations.deleteExpectations(key)
         return true, nil
      }
      return false, err
   }
   // 拷贝一份避免修改
   job := *sharedJob.DeepCopy()

   // 通过 JobCondition 的 Type 是否为 "Complete"/"Failed" 来判断 job 是否已经完成了
   if IsJobFinished(&job) {
      return true, nil
   }

   // 这个 feature 是 1.22 版本进入 beta 的,如果两边配置不一致,则无法继续处理
   // 本质通过 .Spec.CompletionMode == "Indexed" 来判断
   if !feature.DefaultFeatureGate.Enabled(features.IndexedJob) && isIndexedJob(&job) {
      jm.recorder.Event(&job, v1.EventTypeWarning, "IndexedJobDisabled", "Skipped Indexed Job sync because feature is disabled.")
      return false, nil
   }
   // CompletionMode 为 "NonIndexed"/"Indexed",如果是其他值则不识别
   if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode != batch.NonIndexedCompletion && *job.Spec.CompletionMode != batch.IndexedCompletion {
      jm.recorder.Event(&job, v1.EventTypeWarning, "UnknownCompletionMode", "Skipped Job sync because completion mode is unknown")
      return false, nil
   }

   // 配置当前的 completionMode,默认为 "NonIndexed"
   completionMode := string(batch.NonIndexedCompletion)
   if isIndexedJob(&job) {
      completionMode = string(batch.IndexedCompletion)
   }
   // "reconciling"
   action := metrics.JobSyncActionReconciling

   // metrics 逻辑
   defer func() {
      result := "success"
      if rErr != nil {
         result = "error"
      }

      metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(time.Since(startTime).Seconds())
      metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
   }()

   var expectedRmFinalizers sets.String
   var uncounted *uncountedTerminatedPods
   // 处理 pod finalizer,1.22 版本 alpha 的特性
   if trackingUncountedPods(&job) {
      klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job))
      if job.Status.UncountedTerminatedPods == nil {
         job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
      }
      uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
      expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key)
     // 删除 job 的 "batch.kubernetes.io/job-tracking" 注解
   } else if patch := removeTrackingAnnotationPatch(&job); patch != nil {
      if err := jm.patchJobHandler(&job, patch); err != nil {
         return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err)
      }
   }

   jobNeedsSync := jm.expectations.SatisfiedExpectations(key)

   // 提取相关 pods
   pods, err := jm.getPodsForJob(&job, uncounted != nil)
   if err != nil {
      return false, err
   }
   // 判断依据是 PodPhase 不为 "Succeeded" 和 "Failed" 两个结果态
   activePods := controller.FilterActivePods(pods)
   active := int32(len(activePods))
   // 计算 "Succeeded" 和 "Failed" 状态 pod 的数量
   succeeded, failed := getStatus(&job, pods, uncounted, expectedRmFinalizers)
   // 满足这个条件说明这个 pod 是新创建的,这时候需要设置 .Status.StartTime
   if job.Status.StartTime == nil && !jobSuspended(&job) {
      now := metav1.Now()
      job.Status.StartTime = &now
      // 如果 ActiveDeadlineSeconds 不为空,则在 ActiveDeadlineSeconds 时间到后再次调谐
      if job.Spec.ActiveDeadlineSeconds != nil {
         klog.V(4).Infof("Job %s has ActiveDeadlineSeconds will sync after %d seconds",
            key, *job.Spec.ActiveDeadlineSeconds)
         jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
      }
   }

   var manageJobErr error
   var finishedCondition *batch.JobCondition

   // 有新增 failed 到 pod
   jobHasNewFailure := failed > job.Status.Failed

   // 有新的 failed pod 产生,而且 active 的 pod 数量不等于并发数,而且已经失败的 pod 数量大于重试次数限制
   exceedsBackoffLimit := jobHasNewFailure && (active != *job.Spec.Parallelism) &&
      (failed > *job.Spec.BackoffLimit)
   // pastBackoffLimitOnFailure 计算的是当 pod 重启策略为 OnFailure 时重启次数是否超过限制
   if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
      // 重试次数达到上限,Condition 更新为 "Failed"
      finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit")
   // 超时了
   } else if pastActiveDeadline(&job) {
      finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline")
   }
   // 计算索引
   var prevSucceededIndexes, succeededIndexes orderedIntervals
   if isIndexedJob(&job) {
      prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(&job, pods)
      succeeded = int32(succeededIndexes.total())
   }
   suspendCondChanged := false
   // 如果 job 失败了,这时候在 Active 状态的 Pod 需要直接删除
   if finishedCondition != nil {
      deleted, err := jm.deleteActivePods(&job, activePods)
      if uncounted == nil {
         deleted = active
      } else if deleted != active {
         finishedCondition = nil
      }
      active -= deleted
      failed += deleted
      manageJobErr = err
   } else {
      manageJobCalled := false
      if jobNeedsSync && job.DeletionTimestamp == nil {
         // manageJob() 方法是根据 Spec 管理运行中 Pod 数量的核心方法
         active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, succeededIndexes)
         manageJobCalled = true
      }
      // 判断 job 已经完成
      complete := false
      if job.Spec.Completions == nil {
         complete = succeeded > 0 && active == 0
      } else {
         complete = succeeded >= *job.Spec.Completions && active == 0
      }
      if complete {
         finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "")
        // Job 挂起是 1.22 版本 beta 的新特性
      } else if feature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled {
         // 如果配置了挂起
         if job.Spec.Suspend != nil && *job.Spec.Suspend {
            // 只有没完成的 Job 可以被挂起
            var isUpdated bool
            job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended")
            if isUpdated {
               suspendCondChanged = true
               jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
            }
         } else {
            // 挂起状态唤醒
            var isUpdated bool
            job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed")
            if isUpdated {
               suspendCondChanged = true
               jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
               now := metav1.Now()
               // 重置 StartTime
               job.Status.StartTime = &now
            }
         }
      }
   }

   forget = false
   // 检查成功的 pod 是否多了
   if job.Status.Succeeded < succeeded {
      forget = true
   }

   if uncounted != nil {
      // 挂起状态变更或者 active pod 数量变更
      needsStatusUpdate := suspendCondChanged || active != job.Status.Active
      job.Status.Active = active
      // Finalizer 相关逻辑
      err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate)
      if err != nil {
         return false, fmt.Errorf("tracking status: %w", err)
      }
      jobFinished := IsJobFinished(&job)
      if jobHasNewFailure && !jobFinished {
         return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
      }
      forget = true
      return forget, manageJobErr
   }
   // 移除所有 Finalizer
   if err := jm.removeTrackingFinalizersFromAllPods(pods); err != nil {
      return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err)
   }

   // 判断状态是否需要更新
   if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || suspendCondChanged || finishedCondition != nil {
      job.Status.Active = active
      job.Status.Succeeded = succeeded
      job.Status.Failed = failed
      if isIndexedJob(&job) {
         job.Status.CompletedIndexes = succeededIndexes.String()
      }
      job.Status.UncountedTerminatedPods = nil
      jm.enactJobFinished(&job, finishedCondition)

      if _, err := jm.updateStatusHandler(&job); err != nil {
         return forget, err
      }

      if jobHasNewFailure && !IsJobFinished(&job) {
         // returning an error will re-enqueue Job after the backoff period
         return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
      }

      forget = true
   }

   return forget, manageJobErr
}

Pod 数量管理 - manageJob()

上面 syncJob() 中有一个 manageJob() 方法调用,manageJob() 具体控制一个 Job 下应该有多少个 Active 的 Pod,执行“多删少建”工作。这个方法也很长……

  • pkg/controller/job/job_controller.go:1245
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval) (int32, string, error) {
   // 运行中的 pod 数量
   active := int32(len(activePods))
   // 并发度
   parallelism := *job.Spec.Parallelism
   jobKey, err := controller.KeyFunc(job)
   if err != nil {
      utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
      return 0, metrics.JobSyncActionTracking, nil
   }
   // 挂起状态
   if jobSuspended(job) {
      klog.V(4).InfoS("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
      podsToDelete := activePodsForRemoval(job, activePods, int(active))
      jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
      // 挂起需要直接删除所有 Active 的 Pod
      removed, err := jm.deleteJobPods(job, jobKey, podsToDelete)
      active -= removed
      return active, metrics.JobSyncActionPodsDeleted, err
   }

   wantActive := int32(0)
   if job.Spec.Completions == nil {
      // 对应没有配置 Completions 的场景,这时候运行中的 Pod 需要和并发数相等,有一个 Pod 成功时判断 Job 状态为成功
      if succeeded > 0 {
         wantActive = active
      } else {
         wantActive = parallelism
      }
   } else {
      // 指定了 Completions 场景,这时候运行中的 Pod 数量不应该超过 Completions 减去已经成功的数量
      wantActive = *job.Spec.Completions - succeeded
      // 不能超过并发数
      if wantActive > parallelism {
         wantActive = parallelism
      }
      if wantActive < 0 {
         wantActive = 0
      }
   }
   // 如果实际 active 数量大于应该 active 的数量,就需要删除几个
   rmAtLeast := active - wantActive
   if rmAtLeast < 0 {
      rmAtLeast = 0
   }
   // 计算哪些 Pod 需要被删除
   podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast))
   if len(podsToDelete) > MaxPodCreateDeletePerSync {
      podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
   }
   // 执行删除动作
   if len(podsToDelete) > 0 {
      jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
      klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
      removed, err := jm.deleteJobPods(job, jobKey, podsToDelete)
      active -= removed
      return active, metrics.JobSyncActionPodsDeleted, err
   }
   // 实际运行的 pod 数量不够场景
   if active < wantActive {
      diff := wantActive - active
      // 如果大于上限 500,则设置为 500
      if diff > int32(MaxPodCreateDeletePerSync) {
         diff = int32(MaxPodCreateDeletePerSync)
      }

      jm.expectations.ExpectCreations(jobKey, int(diff))
      errCh := make(chan error, diff)
      klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)

      wait := sync.WaitGroup{}

      var indexesToAdd []int
      if isIndexedJob(job) {
         indexesToAdd = firstPendingIndexes(activePods, succeededIndexes, int(diff), int(*job.Spec.Completions))
         diff = int32(len(indexesToAdd))
      }
      active += diff
      // 提取 pod 模板
      podTemplate := job.Spec.Template.DeepCopy()
      if isIndexedJob(job) {
         addCompletionIndexEnvVariables(podTemplate)
      }
      if trackingUncountedPods(job) {
         podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
      }
      // batchSize 从 1 开始,然后指数递增,2、4、8 …… 的方式,这是一种“慢启动”过程,防止一下子尝试创建大量 Pod,但是由于相同的原因批量失败
      for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
         errorCount := len(errCh)
         wait.Add(int(batchSize))
         for i := int32(0); i < batchSize; i++ {
            // -1
            completionIndex := unknownCompletionIndex
            if len(indexesToAdd) > 0 {
               completionIndex = indexesToAdd[0]
               indexesToAdd = indexesToAdd[1:]
            }
            go func() {
               template := podTemplate
               generateName := ""
               if completionIndex != unknownCompletionIndex {
                  template = podTemplate.DeepCopy()
                  addCompletionIndexAnnotation(template, completionIndex)
                  // 设置 Hostname
                  template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)
                  generateName = podGenerateNameWithIndex(job.Name, completionIndex)
               }
               defer wait.Done()
               // 创建 pod
               err := jm.podControl.CreatePodsWithGenerateName(job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)
               if err != nil {
                  if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
                     return
                  }
               }
               if err != nil {
                  defer utilruntime.HandleError(err)
                  klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
                  jm.expectations.CreationObserved(jobKey)
                  atomic.AddInt32(&active, -1)
                  errCh <- err
               }
            }()
         }
         wait.Wait()
         skippedPods := diff - batchSize
         if errorCount < len(errCh) && skippedPods > 0 {
            klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
            active -= skippedPods
            for i := int32(0); i < skippedPods; i++ {
               jm.expectations.CreationObserved(jobKey)
            }
            // 忽略的 pod 在下次调谐过程中继续尝试“慢启动”
            break
         }
         // 成功处理的数量减掉
         diff -= batchSize
      }
      return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
   }

   return active, metrics.JobSyncActionTracking, nil
}

小结

Job 控制器实现的逻辑并不复杂,就像我们一开始在《Kubernetes Job Controller 原理和源码分析(一)》中介绍的那些特性一样,Job 要支持的功能并不多。但是要完整理解 Pod 控制器全部源码并不简单,一方面需要对控制器模式本身有一定的理解,知道控制器的整体工作流;另外一方面 Job 控制器的实现中有大量健壮性代码,在实现功能的基础上代码量大了很多,要理清所有的细节还是有一定烧脑。最后这个源码组织结构着实看着不舒服,几百行代码“怼”到一个函数里,对“阅读者”不太友好。