久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Kubernetes Job Controller怎么構造

157次閱讀
沒有評論

共計 16744 個字符,預計需要花費 42 分鐘才能閱讀完成。

這篇文章主要講解了“Kubernetes Job Controller 怎么構造”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“Kubernetes Job Controller 怎么構造”吧!

實現流程圖

廢話不多說,先把完整流程貼出來。

New JobController

type JobController struct {
 kubeClient clientset.Interface
 podControl controller.PodControlInterface
 // To allow injection of updateJobStatus for testing.
 updateHandler func(job *batch.Job) error
 syncHandler func(jobKey string) (bool, error)
 // podStoreSynced returns true if the pod store has been synced at least once.
 // Added as a member to the struct to allow injection for testing.
 podStoreSynced cache.InformerSynced
 // jobStoreSynced returns true if the job store has been synced at least once.
 // Added as a member to the struct to allow injection for testing.
 jobStoreSynced cache.InformerSynced
 // A TTLCache of pod creates/deletes each rc expects to see
 expectations controller.ControllerExpectationsInterface
 // A store of jobs
 jobLister batchv1listers.JobLister
 // A store of pods, populated by the podController
 podStore corelisters.PodLister
 // Jobs that need to be updated
 queue workqueue.RateLimitingInterface
 recorder record.EventRecorder

func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController {eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. eventBroadcaster.StartRecordingToSink(v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events()}) if kubeClient != nil   kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {metrics.RegisterMetricAndTrackRateLimiterUsage( job_controller , kubeClient.CoreV1().RESTClient().GetRateLimiter()) jm :=  JobController{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component:  job-controller}), expectations: controller.NewControllerExpectations(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff),  job ), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component:  job-controller}), jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: jm.enqueueController, UpdateFunc: jm.updateJob, DeleteFunc: jm.enqueueController, jm.jobLister = jobInformer.Lister() jm.jobStoreSynced = jobInformer.Informer().HasSynced podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: jm.addPod, UpdateFunc: jm.updatePod, DeleteFunc: jm.deletePod, jm.podStore = podInformer.Lister() jm.podStoreSynced = podInformer.Informer().HasSynced jm.updateHandler = jm.updateJobStatus jm.syncHandler = jm.syncJob return jm }

構造 JobController,并初始化相關數據,比如 rate limiter queue;

watch pod and job object;

注冊 podInformer 的 add/del/update EventHandler;

注冊 jobInformer 的 add/del/update EventHandler;

注冊 updataHandler 為 updateJobStatus,用來更新 Job 狀態;

注冊 syncHandler 為 syncJob,用來進行處理 queue 中的 Job;

JobController Run

// Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh  -chan struct{}) {defer utilruntime.HandleCrash()
 defer jm.queue.ShutDown()
 glog.Infof(Starting job controller)
 defer glog.Infof(Shutting down job controller)
 if !controller.WaitForCacheSync(job , stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
 return
 for i := 0; i   workers; i++ {go wait.Until(jm.worker, time.Second, stopCh)
 -stopCh
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *JobController) worker() {for jm.processNextWorkItem() {func (jm *JobController) processNextWorkItem() bool {key, quit := jm.queue.Get()
 if quit {
 return false
 defer jm.queue.Done(key)
 forget, err := jm.syncHandler(key.(string))
 if err == nil {
 if forget {jm.queue.Forget(key)
 return true
 utilruntime.HandleError(fmt.Errorf( Error syncing job: %v , err))
 jm.queue.AddRateLimited(key)
 return true
}

WaitForCacheSync 等待 jobController cache 同步;

啟動 5 個 goruntine,每個協程分別執行 worker,每個 worker 執行完后等待 1s,繼續執行,如此循環;

worker 負責從從 queue 中 get job key,對每個 job,調用 syncJob 進行同步,如果 syncJob 成功,則 forget the job(其實就是讓 rate limiter 停止 tracking it),否則將該 key 再次加入到 queue 中,等待下次 sync。

syncJob

// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jm *JobController) syncJob(key string) (bool, error) {startTime := time.Now()
 defer func() {glog.V(4).Infof(Finished syncing job %q (%v) , key, time.Now().Sub(startTime))
 ns, name, err := cache.SplitMetaNamespaceKey(key)
 if err != nil {
 return false, err
 if len(ns) == 0 || len(name) == 0 {return false, fmt.Errorf( invalid job key %q: either namespace or name is missing , key)
 sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
 if err != nil {if errors.IsNotFound(err) {glog.V(4).Infof(Job has been deleted: %v , key)
 jm.expectations.DeleteExpectations(key)
 return true, nil
 return false, err
 job := *sharedJob
 // if job was finished previously, we don t want to redo the termination
 if IsJobFinished(job) {
 return true, nil
 // retrieve the previous number of retry
 previousRetry := jm.queue.NumRequeues(key)
 // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
 // and update the expectations after we ve retrieved active pods from the store. If a new pod enters
 // the store after we ve checked the expectation, the job sync is just deferred till the next relist.
 jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
 pods, err := jm.getPodsForJob(job)
 if err != nil {
 return false, err
 activePods := controller.FilterActivePods(pods)
 active := int32(len(activePods))
 succeeded, failed := getStatus(pods)
 conditions := len(job.Status.Conditions)
 // job first start
 if job.Status.StartTime == nil {now := metav1.Now()
 job.Status.StartTime =  now
 // enqueue a sync to check if job past ActiveDeadlineSeconds
 if job.Spec.ActiveDeadlineSeconds != nil {glog.V(4).Infof( Job %s have ActiveDeadlineSeconds will sync after %d seconds ,
 key, *job.Spec.ActiveDeadlineSeconds)
 jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
 var manageJobErr error
 jobFailed := false
 var failureReason string
 var failureMessage string
 jobHaveNewFailure := failed   job.Status.Failed
 // check if the number of failed jobs increased since the last syncJob
 if jobHaveNewFailure   (int32(previousRetry)+1   *job.Spec.BackoffLimit) {
 jobFailed = true
 failureReason =  BackoffLimitExceeded 
 failureMessage =  Job has reach the specified backoff limit 
 } else if pastActiveDeadline(job) {
 jobFailed = true
 failureReason =  DeadlineExceeded 
 failureMessage =  Job was active longer than specified deadline 
 if jobFailed {errCh := make(chan error, active)
 jm.deleteJobPods(job, activePods, errCh)
 select {
 case manageJobErr =  -errCh:
 if manageJobErr != nil {
 break
 default:
 // update status values accordingly
 failed += active
 active = 0
 job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
 jm.recorder.Event(job, v1.EventTypeWarning, failureReason, failureMessage)
 } else {
 if jobNeedsSync   job.DeletionTimestamp == nil {active, manageJobErr = jm.manageJob(activePods, succeeded,  job)
 completions := succeeded
 complete := false
 if job.Spec.Completions == nil {
 // This type of job is complete when any pod exits with success.
 // Each pod is capable of
 // determining whether or not the entire Job is done. Subsequent pods are
 // not expected to fail, but if they do, the failure is ignored. Once any
 // pod succeeds, the controller waits for remaining pods to finish, and
 // then the job is complete.
 if succeeded   0   active == 0 {complete = true} else {
 // Job specifies a number of completions. This type of job signals
 // success by having that number of successes. Since we do not
 // start more pods than there are remaining completions, there should
 // not be any remaining active pods once this count is reached.
 if completions  = *job.Spec.Completions {
 complete = true
 if active   0 {jm.recorder.Event( job, v1.EventTypeWarning,  TooManyActivePods ,  Too many active pods running after completion count reached)
 if completions   *job.Spec.Completions {jm.recorder.Event( job, v1.EventTypeWarning,  TooManySucceededPods ,  Too many succeeded pods running after completion count reached)
 if complete {job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete,  , ))
 now := metav1.Now()
 job.Status.CompletionTime =  now
 forget := false
 // no need to update the job if the status hasn t changed since last time
 if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
 job.Status.Active = active
 job.Status.Succeeded = succeeded
 job.Status.Failed = failed
 if err := jm.updateHandler(job); err != nil {
 return false, err
 if jobHaveNewFailure   !IsJobFinished(job) {
 // returning an error will re-enqueue Job after the backoff period
 return false, fmt.Errorf(failed pod(s) detected for job key %q , key)
 forget = true
 return forget, manageJobErr
}

從 Indexer 中查找指定的 Job 是否存在,如果不存在,則從 expectations 中刪除該 job,流程結束返回 true。否則繼續下面流程。

根據 JobCondition Complete or Failed 判斷 Job 是否 Finished,如果 Finished,則流程結束返回 true,否則繼續下面流程。

調用 SatisfiedExpectations,如果 ControlleeExpectations 中待 add 和 del 都 =0,或者 expectations 已經超過 5 分鐘沒更新過了,則返回 jobNeedsSync=true,表示需要進行一次 manageJob 了。

對于那些第一次啟動的 jobs (StartTime==nil), 需要把設置 StartTime,并且如果 ActiveDeadlineSeconds 不為空,則經過 ActiveDeadlineSeconds 后再次把該 job 加入到 queue 中進行 sync。

獲取該 job 管理的所有 pods,過濾出 activePods,計算出 actived,successed,failed pods 的數量。如果 failed job.Status.Failed,說明該 job 又有新 failed Pods 了,則 jobHaveNewFailure 為 true。

如果 jobHaveNewFailure,并且 queue 記錄的該 job retry 次數加 1,比 job.Spec.BackoffLimit(默認為 6),則表示該 job BackoffLimitExceeded,jobFailed。如果 job StartTime 到現在為止的歷時 =ActiveDeadlineSeconds,則表示該 job DeadlineExceeded,jobFailed。

如果 jobFailed,則用 sync.WaitGroup 并發等待刪除所有的前面過濾出來的 activePods,刪除成功,則 failed += acitve, active = 0, 并設置 Condition Failed 為 true。

如果 job not failed, jobNeedSync 為 true,并且 job 的 DeletionTimestamp 為空(沒有標記為刪除),則調用 manageJob 對 Job 管理的 pods 根據復雜的策略進行 add or del。

如果 job not failed 且 job.Spec.Completions 為 nil,表示 This type of job is complete when any pod exits with success。因此如果 succeeded 0 active == 0,則表示 job completed。

如果如果 job not failed 且 job.Spec.Completions 不為 nil,表示 This type of job signals success by having that number of successes。因此如果 succeeded = job.Spec.Completions,則表示 job completed。

如果 job completed,則更新其 Conditions Complete 為 true,并設置 CompletionTime。

接下來 invoke updateJobStatus 更新 etcd 中 job 狀態,如果更新失敗,則返回 false,該 job 將再次加入 queue。如果 jobHaveNewFailure 為 true,并且 Job Condition 顯示該 Job not Finished,則返回 false,該 job 將再次加入 queue。

manageJob

// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Does NOT modify  activePods .
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
 var activeLock sync.Mutex
 active := int32(len(activePods))
 parallelism := *job.Spec.Parallelism
 jobKey, err := controller.KeyFunc(job)
 if err != nil {utilruntime.HandleError(fmt.Errorf( Couldn t get key for job %#v: %v , job, err))
 return 0, nil
 var errCh chan error
 if active   parallelism {
 diff := active - parallelism
 errCh = make(chan error, diff)
 jm.expectations.ExpectDeletions(jobKey, int(diff))
 glog.V(4).Infof(Too many pods running job %q, need %d, deleting %d , jobKey, parallelism, diff)
 // Sort the pods in the order such that not-ready   ready, unscheduled
 //   scheduled, and pending   running. This ensures that we delete pods
 // in the earlier stages whenever possible.
 sort.Sort(controller.ActivePods(activePods))
 active -= diff
 wait := sync.WaitGroup{}
 wait.Add(int(diff))
 for i := int32(0); i   diff; i++ {go func(ix int32) {defer wait.Done()
 if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {defer utilruntime.HandleError(err)
 // Decrement the expected number of deletes because the informer won t observe this deletion
 glog.V(2).Infof(Failed to delete %v, decrementing expectations for job %q/%q , activePods[ix].Name, job.Namespace, job.Name)
 jm.expectations.DeletionObserved(jobKey)
 activeLock.Lock()
 active++
 activeLock.Unlock()
 errCh  - err
 }(i)
 wait.Wait()} else if active   parallelism {wantActive := int32(0)
 if job.Spec.Completions == nil {
 // Job does not specify a number of completions. Therefore, number active
 // should be equal to parallelism, unless the job has seen at least
 // once success, in which leave whatever is running, running.
 if succeeded   0 {wantActive = active} else {wantActive = parallelism} else {
 // Job specifies a specific number of completions. Therefore, number
 // active should not ever exceed number of remaining completions.
 wantActive = *job.Spec.Completions - succeeded
 if wantActive   parallelism {
 wantActive = parallelism
 diff := wantActive - active
 if diff   0 {utilruntime.HandleError(fmt.Errorf( More active than wanted: job %q, want %d, have %d , jobKey, wantActive, active))
 diff = 0
 jm.expectations.ExpectCreations(jobKey, int(diff))
 errCh = make(chan error, diff)
 glog.V(4).Infof(Too few pods running job %q, need %d, creating %d , jobKey, wantActive, diff)
 active += diff
 wait := sync.WaitGroup{}
 // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
 // and double with each successful iteration in a kind of  slow start .
 // This handles attempts to start large numbers of pods that would
 // likely all fail with the same error. For example a project with a
 // low quota that attempts to create a large number of pods will be
 // prevented from spamming the API service with the pod create requests
 // after one of its pods fails. Conveniently, this also prevents the
 // event spam that those failures would generate.
 for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff   0; batchSize = integer.Int32Min(2*batchSize, diff) {errorCount := len(errCh)
 wait.Add(int(batchSize))
 for i := int32(0); i   batchSize; i++ {go func() {defer wait.Done()
 err := jm.podControl.CreatePodsWithControllerRef(job.Namespace,  job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
 if err != nil   errors.IsTimeout(err) {
 // Pod is created but its initialization has timed out.
 // If the initialization is successful eventually, the
 // controller will observe the creation via the informer.
 // If the initialization fails, or if the pod keeps
 // uninitialized for a long time, the informer will not
 // receive any update, and the controller will create a new
 // pod when the expectation expires.
 return
 if err != nil {defer utilruntime.HandleError(err)
 // Decrement the expected number of creates because the informer won t observe this pod
 glog.V(2).Infof(Failed creation, decrementing expectations for job %q/%q , job.Namespace, job.Name)
 jm.expectations.CreationObserved(jobKey)
 activeLock.Lock()
 active--
 activeLock.Unlock()
 errCh  - err
 wait.Wait()
 // any skipped pods that we never attempted to start shouldn t be expected.
 skippedPods := diff - batchSize
 if errorCount   len(errCh)   skippedPods   0 {glog.V(2).Infof(Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q , skippedPods, job.Namespace, job.Name)
 active -= skippedPods
 for i := int32(0); i   skippedPods; i++ {
 // Decrement the expected number of creates because the informer won t observe this pod
 jm.expectations.CreationObserved(jobKey)
 // The skipped pods will be retried later. The next controller resync will
 // retry the slow start process.
 break
 diff -= batchSize
 select {
 case err :=  -errCh:
 // all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
 if err != nil {
 return active, err
 default:
 return active, nil
}

如果 active job.Spec.Parallelism, 表示要 scale down:

計算 active 與 parallelism 的差值 diff,修改 ControllerExpectations 中該 job 的 dels 為 diff,表示要刪除 diff 這么多的 pod。

計算 active 與 parallelism 的差值 diff,修改 ControllerExpectations 中該 job 的 dels 為 diff,表示要刪除 diff 這么多的 pod。

將 activePods 中的 Pods 按照 not-ready ready, unscheduled scheduled, pending running 進行排序,確保先刪除 stage 越早的 pods。

更新 active (active 減去 diff),用 sync.WaitGroup 并發等待刪除 etcd 中那些 Pods。如果刪除某個 Pod 失敗,active 要加 1,expectations 中 dels 要減 1.

返回 active

如果 active job.Spec.Parallelism,表示要 scale up:

如果 job.Spec.Completions 為 nil,且 succeeded 大于 0,則 diff 設為 0;如果 job.Spec.Completions 為 nil,但 successed = 0,則 diff 為 parallelism-active;如果 job.Spec.Completions 不為 nil,則 diff 為 max(job.Spec.Completions – succeeded,parallelim) – active;

修改 ControllerExpectations 中該 job 的 adds 為 diff,表示要新增 diff 這么多的 pod。

更新 active (active 加上 diff),用 sync.WaitGroup 分批的創建 Pods,第一批創建 1 個 ( 代碼寫死 SlowStartInitialBatchSize = 1),第二批創建 2,然后 4,8,16… 這樣下去,但是每次不能超過 diff 的值。每一批創建 pod 后,注意更新 diff 的值(減去 batchsize)。如果某一批創建過程 Pods 中存在失敗情況,則更新 active 和 expectations 中 adds,且不進行后續未啟動的批量創建 pods 行為。

如果 active == job.Spec.Parallelism,返回 active。

感謝各位的閱讀,以上就是“Kubernetes Job Controller 怎么構造”的內容了,經過本文的學習后,相信大家對 Kubernetes Job Controller 怎么構造這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計16744字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 宝应县| 和平县| 盐亭县| 嘉鱼县| 宁强县| 古田县| 泽州县| 章丘市| 抚远县| 公主岭市| 同心县| 桑日县| 临澧县| 永胜县| 玛沁县| 合肥市| 乳源| 临漳县| 江安县| 葵青区| 承德县| 安平县| 望都县| 高阳县| 西畴县| 玛曲县| 临邑县| 明星| 枣庄市| 呼伦贝尔市| 黄石市| 玉门市| 杨浦区| 吴江市| 盘山县| 永春县| 元江| 东安县| 连南| 白银市| 鸡西市|