共計 16744 個字符,預計需要花費 42 分鐘才能閱讀完成。
這篇文章主要講解了“Kubernetes Job Controller 怎么構造”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“Kubernetes Job Controller 怎么構造”吧!
實現流程圖
廢話不多說,先把完整流程貼出來。
New JobController
type JobController struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
// To allow injection of updateJobStatus for testing.
updateHandler func(job *batch.Job) error
syncHandler func(jobKey string) (bool, error)
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced
// jobStoreSynced returns true if the job store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
jobStoreSynced cache.InformerSynced
// A TTLCache of pod creates/deletes each rc expects to see
expectations controller.ControllerExpectationsInterface
// A store of jobs
jobLister batchv1listers.JobLister
// A store of pods, populated by the podController
podStore corelisters.PodLister
// Jobs that need to be updated
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController {eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events()})
if kubeClient != nil kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {metrics.RegisterMetricAndTrackRateLimiterUsage( job_controller , kubeClient.CoreV1().RESTClient().GetRateLimiter())
jm := JobController{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: job-controller}),
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), job ),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: job-controller}),
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
UpdateFunc: jm.updateJob,
DeleteFunc: jm.enqueueController,
jm.jobLister = jobInformer.Lister()
jm.jobStoreSynced = jobInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.addPod,
UpdateFunc: jm.updatePod,
DeleteFunc: jm.deletePod,
jm.podStore = podInformer.Lister()
jm.podStoreSynced = podInformer.Informer().HasSynced
jm.updateHandler = jm.updateJobStatus
jm.syncHandler = jm.syncJob
return jm
}
構造 JobController,并初始化相關數據,比如 rate limiter queue;
watch pod and job object;
注冊 podInformer 的 add/del/update EventHandler;
注冊 jobInformer 的 add/del/update EventHandler;
注冊 updataHandler 為 updateJobStatus,用來更新 Job 狀態;
注冊 syncHandler 為 syncJob,用來進行處理 queue 中的 Job;
JobController Run
// Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh -chan struct{}) {defer utilruntime.HandleCrash()
defer jm.queue.ShutDown()
glog.Infof(Starting job controller)
defer glog.Infof(Shutting down job controller)
if !controller.WaitForCacheSync(job , stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
return
for i := 0; i workers; i++ {go wait.Until(jm.worker, time.Second, stopCh)
-stopCh
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *JobController) worker() {for jm.processNextWorkItem() {func (jm *JobController) processNextWorkItem() bool {key, quit := jm.queue.Get()
if quit {
return false
defer jm.queue.Done(key)
forget, err := jm.syncHandler(key.(string))
if err == nil {
if forget {jm.queue.Forget(key)
return true
utilruntime.HandleError(fmt.Errorf( Error syncing job: %v , err))
jm.queue.AddRateLimited(key)
return true
}
WaitForCacheSync 等待 jobController cache 同步;
啟動 5 個 goruntine,每個協程分別執行 worker,每個 worker 執行完后等待 1s,繼續執行,如此循環;
worker 負責從從 queue 中 get job key,對每個 job,調用 syncJob 進行同步,如果 syncJob 成功,則 forget the job(其實就是讓 rate limiter 停止 tracking it),否則將該 key 再次加入到 queue 中,等待下次 sync。
syncJob
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jm *JobController) syncJob(key string) (bool, error) {startTime := time.Now()
defer func() {glog.V(4).Infof(Finished syncing job %q (%v) , key, time.Now().Sub(startTime))
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false, err
if len(ns) == 0 || len(name) == 0 {return false, fmt.Errorf( invalid job key %q: either namespace or name is missing , key)
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil {if errors.IsNotFound(err) {glog.V(4).Infof(Job has been deleted: %v , key)
jm.expectations.DeleteExpectations(key)
return true, nil
return false, err
job := *sharedJob
// if job was finished previously, we don t want to redo the termination
if IsJobFinished(job) {
return true, nil
// retrieve the previous number of retry
previousRetry := jm.queue.NumRequeues(key)
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we ve retrieved active pods from the store. If a new pod enters
// the store after we ve checked the expectation, the job sync is just deferred till the next relist.
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
pods, err := jm.getPodsForJob(job)
if err != nil {
return false, err
activePods := controller.FilterActivePods(pods)
active := int32(len(activePods))
succeeded, failed := getStatus(pods)
conditions := len(job.Status.Conditions)
// job first start
if job.Status.StartTime == nil {now := metav1.Now()
job.Status.StartTime = now
// enqueue a sync to check if job past ActiveDeadlineSeconds
if job.Spec.ActiveDeadlineSeconds != nil {glog.V(4).Infof( Job %s have ActiveDeadlineSeconds will sync after %d seconds ,
key, *job.Spec.ActiveDeadlineSeconds)
jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
var manageJobErr error
jobFailed := false
var failureReason string
var failureMessage string
jobHaveNewFailure := failed job.Status.Failed
// check if the number of failed jobs increased since the last syncJob
if jobHaveNewFailure (int32(previousRetry)+1 *job.Spec.BackoffLimit) {
jobFailed = true
failureReason = BackoffLimitExceeded
failureMessage = Job has reach the specified backoff limit
} else if pastActiveDeadline(job) {
jobFailed = true
failureReason = DeadlineExceeded
failureMessage = Job was active longer than specified deadline
if jobFailed {errCh := make(chan error, active)
jm.deleteJobPods(job, activePods, errCh)
select {
case manageJobErr = -errCh:
if manageJobErr != nil {
break
default:
// update status values accordingly
failed += active
active = 0
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
jm.recorder.Event(job, v1.EventTypeWarning, failureReason, failureMessage)
} else {
if jobNeedsSync job.DeletionTimestamp == nil {active, manageJobErr = jm.manageJob(activePods, succeeded, job)
completions := succeeded
complete := false
if job.Spec.Completions == nil {
// This type of job is complete when any pod exits with success.
// Each pod is capable of
// determining whether or not the entire Job is done. Subsequent pods are
// not expected to fail, but if they do, the failure is ignored. Once any
// pod succeeds, the controller waits for remaining pods to finish, and
// then the job is complete.
if succeeded 0 active == 0 {complete = true} else {
// Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached.
if completions = *job.Spec.Completions {
complete = true
if active 0 {jm.recorder.Event( job, v1.EventTypeWarning, TooManyActivePods , Too many active pods running after completion count reached)
if completions *job.Spec.Completions {jm.recorder.Event( job, v1.EventTypeWarning, TooManySucceededPods , Too many succeeded pods running after completion count reached)
if complete {job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, , ))
now := metav1.Now()
job.Status.CompletionTime = now
forget := false
// no need to update the job if the status hasn t changed since last time
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
job.Status.Active = active
job.Status.Succeeded = succeeded
job.Status.Failed = failed
if err := jm.updateHandler(job); err != nil {
return false, err
if jobHaveNewFailure !IsJobFinished(job) {
// returning an error will re-enqueue Job after the backoff period
return false, fmt.Errorf(failed pod(s) detected for job key %q , key)
forget = true
return forget, manageJobErr
}
從 Indexer 中查找指定的 Job 是否存在,如果不存在,則從 expectations 中刪除該 job,流程結束返回 true。否則繼續下面流程。
根據 JobCondition Complete or Failed 判斷 Job 是否 Finished,如果 Finished,則流程結束返回 true,否則繼續下面流程。
調用 SatisfiedExpectations,如果 ControlleeExpectations 中待 add 和 del 都 =0,或者 expectations 已經超過 5 分鐘沒更新過了,則返回 jobNeedsSync=true,表示需要進行一次 manageJob 了。
對于那些第一次啟動的 jobs (StartTime==nil), 需要把設置 StartTime,并且如果 ActiveDeadlineSeconds 不為空,則經過 ActiveDeadlineSeconds 后再次把該 job 加入到 queue 中進行 sync。
獲取該 job 管理的所有 pods,過濾出 activePods,計算出 actived,successed,failed pods 的數量。如果 failed job.Status.Failed,說明該 job 又有新 failed Pods 了,則 jobHaveNewFailure 為 true。
如果 jobHaveNewFailure,并且 queue 記錄的該 job retry 次數加 1,比 job.Spec.BackoffLimit(默認為 6),則表示該 job BackoffLimitExceeded,jobFailed。如果 job StartTime 到現在為止的歷時 =ActiveDeadlineSeconds,則表示該 job DeadlineExceeded,jobFailed。
如果 jobFailed,則用 sync.WaitGroup 并發等待刪除所有的前面過濾出來的 activePods,刪除成功,則 failed += acitve, active = 0, 并設置 Condition Failed 為 true。
如果 job not failed, jobNeedSync 為 true,并且 job 的 DeletionTimestamp 為空(沒有標記為刪除),則調用 manageJob 對 Job 管理的 pods 根據復雜的策略進行 add or del。
如果 job not failed 且 job.Spec.Completions 為 nil,表示 This type of job is complete when any pod exits with success。因此如果 succeeded 0 active == 0,則表示 job completed。
如果如果 job not failed 且 job.Spec.Completions 不為 nil,表示 This type of job signals success by having that number of successes。因此如果 succeeded = job.Spec.Completions,則表示 job completed。
如果 job completed,則更新其 Conditions Complete 為 true,并設置 CompletionTime。
接下來 invoke updateJobStatus 更新 etcd 中 job 狀態,如果更新失敗,則返回 false,該 job 將再次加入 queue。如果 jobHaveNewFailure 為 true,并且 Job Condition 顯示該 Job not Finished,則返回 false,該 job 將再次加入 queue。
manageJob
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Does NOT modify activePods .
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
var activeLock sync.Mutex
active := int32(len(activePods))
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
if err != nil {utilruntime.HandleError(fmt.Errorf( Couldn t get key for job %#v: %v , job, err))
return 0, nil
var errCh chan error
if active parallelism {
diff := active - parallelism
errCh = make(chan error, diff)
jm.expectations.ExpectDeletions(jobKey, int(diff))
glog.V(4).Infof(Too many pods running job %q, need %d, deleting %d , jobKey, parallelism, diff)
// Sort the pods in the order such that not-ready ready, unscheduled
// scheduled, and pending running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(activePods))
active -= diff
wait := sync.WaitGroup{}
wait.Add(int(diff))
for i := int32(0); i diff; i++ {go func(ix int32) {defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {defer utilruntime.HandleError(err)
// Decrement the expected number of deletes because the informer won t observe this deletion
glog.V(2).Infof(Failed to delete %v, decrementing expectations for job %q/%q , activePods[ix].Name, job.Namespace, job.Name)
jm.expectations.DeletionObserved(jobKey)
activeLock.Lock()
active++
activeLock.Unlock()
errCh - err
}(i)
wait.Wait()} else if active parallelism {wantActive := int32(0)
if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if succeeded 0 {wantActive = active} else {wantActive = parallelism} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - succeeded
if wantActive parallelism {
wantActive = parallelism
diff := wantActive - active
if diff 0 {utilruntime.HandleError(fmt.Errorf( More active than wanted: job %q, want %d, have %d , jobKey, wantActive, active))
diff = 0
jm.expectations.ExpectCreations(jobKey, int(diff))
errCh = make(chan error, diff)
glog.V(4).Infof(Too few pods running job %q, need %d, creating %d , jobKey, wantActive, diff)
active += diff
wait := sync.WaitGroup{}
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of slow start .
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff 0; batchSize = integer.Int32Min(2*batchSize, diff) {errorCount := len(errCh)
wait.Add(int(batchSize))
for i := int32(0); i batchSize; i++ {go func() {defer wait.Done()
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
if err != nil errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
if err != nil {defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won t observe this pod
glog.V(2).Infof(Failed creation, decrementing expectations for job %q/%q , job.Namespace, job.Name)
jm.expectations.CreationObserved(jobKey)
activeLock.Lock()
active--
activeLock.Unlock()
errCh - err
wait.Wait()
// any skipped pods that we never attempted to start shouldn t be expected.
skippedPods := diff - batchSize
if errorCount len(errCh) skippedPods 0 {glog.V(2).Infof(Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q , skippedPods, job.Namespace, job.Name)
active -= skippedPods
for i := int32(0); i skippedPods; i++ {
// Decrement the expected number of creates because the informer won t observe this pod
jm.expectations.CreationObserved(jobKey)
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
diff -= batchSize
select {
case err := -errCh:
// all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
if err != nil {
return active, err
default:
return active, nil
}
如果 active job.Spec.Parallelism, 表示要 scale down:
計算 active 與 parallelism 的差值 diff,修改 ControllerExpectations 中該 job 的 dels 為 diff,表示要刪除 diff 這么多的 pod。
計算 active 與 parallelism 的差值 diff,修改 ControllerExpectations 中該 job 的 dels 為 diff,表示要刪除 diff 這么多的 pod。
將 activePods 中的 Pods 按照 not-ready ready, unscheduled scheduled, pending running 進行排序,確保先刪除 stage 越早的 pods。
更新 active (active 減去 diff),用 sync.WaitGroup 并發等待刪除 etcd 中那些 Pods。如果刪除某個 Pod 失敗,active 要加 1,expectations 中 dels 要減 1.
返回 active
如果 active job.Spec.Parallelism,表示要 scale up:
如果 job.Spec.Completions 為 nil,且 succeeded 大于 0,則 diff 設為 0;如果 job.Spec.Completions 為 nil,但 successed = 0,則 diff 為 parallelism-active;如果 job.Spec.Completions 不為 nil,則 diff 為 max(job.Spec.Completions – succeeded,parallelim) – active;
修改 ControllerExpectations 中該 job 的 adds 為 diff,表示要新增 diff 這么多的 pod。
更新 active (active 加上 diff),用 sync.WaitGroup 分批的創建 Pods,第一批創建 1 個 ( 代碼寫死 SlowStartInitialBatchSize = 1),第二批創建 2,然后 4,8,16… 這樣下去,但是每次不能超過 diff 的值。每一批創建 pod 后,注意更新 diff 的值(減去 batchsize)。如果某一批創建過程 Pods 中存在失敗情況,則更新 active 和 expectations 中 adds,且不進行后續未啟動的批量創建 pods 行為。
如果 active == job.Spec.Parallelism,返回 active。
感謝各位的閱讀,以上就是“Kubernetes Job Controller 怎么構造”的內容了,經過本文的學習后,相信大家對 Kubernetes Job Controller 怎么構造這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!