共計 17809 個字符,預計需要花費 45 分鐘才能閱讀完成。
這篇文章主要講解了“Preemption 搶占式調度的方法是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“Preemption 搶占式調度的方法是什么”吧!
ScheduleAlgorithm 的變化
在 Kubernetes 1.8 中,對 ScheduleAlgorithm Interface 的定義發生了改變,多了一個 Preempt(…)。因此,我在博文 Kubernetes Scheduler 原理解析(當時是基于 kubernetes 1.5)中對 scheduler 調度過程開的一句話概括“將 PodSpec.NodeName 為空的 Pods 逐個地,經過預選 (Predicates) 和優選 (Priorities) 兩個步驟,挑選最合適的 Node 作為該 Pod 的 Destination。”將不再準確了。
現在應該一句話這樣描述才算準確了:“將 PodSpec.NodeName 為空的 Pods 逐個地,經過預選 (Predicates) 和優選 (Priorities) 兩個步驟,挑選最合適的 Node 作為該 Pod 的 Destination。如果經過預選和優選仍然沒有找到合適的節點,并且啟動了 Pod Priority,那么該 Pod 將會進行 Preempt 搶占式調度找到最合適的節點及需要 Evict 的 Pods。”
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
// onto machines.
type ScheduleAlgorithm interface {Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]FitPredicate
// Prioritizers returns a slice of priority config. This is exposed for
// testing.
Prioritizers() []PriorityConfig
}
Scheduler.scheduleOne 開始真正的調度邏輯,每次負責一個 Pod 的調度,邏輯如下:
從 PodQueue 中獲取一個 Pod。
執行對應 Algorithm 的 Schedule,進行預選和優選。
AssumePod
Bind Pod,如果 Bind Failed,ForgetPod。
在 1.8 中,但預選和優選調度完整沒有找到合適 node 時(其實一定會是預選沒有找到 nodes,優選只是挑更好的),還會調用 sched.preempt 進行搶占式調度。
plugin/pkg/scheduler/scheduler.go:293
func (sched *Scheduler) scheduleOne() {pod := sched.config.NextPod()
if pod.DeletionTimestamp != nil {sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, FailedScheduling , skip schedule deleting pod: %v/%v , pod.Namespace, pod.Name)
glog.V(3).Infof(Skip schedule deleting pod: %v/%v , pod.Namespace, pod.Name)
return
glog.V(3).Infof(Attempting to schedule pod: %v/%v , pod.Namespace, pod.Name)
// Synchronously attempt to find a fit for the pod.
start := time.Now()
suggestedHost, err := sched.schedule(pod)
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {// schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if fitError, ok := err.(*core.FitError); ok {sched.preempt(pod, fitError)
return
// Tell the cache to assume that a pod now is running on a given node, even though it hasn t been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPod := *pod
// assume modifies `assumedPod` by setting NodeName=suggestedHost
err = sched.assume(assumedPod, suggestedHost)
if err != nil {
return
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
err := sched.bind( assumedPod, v1.Binding{ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: Node ,
Name: suggestedHost,
metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {glog.Errorf( Internal error binding pod: (%v) , err)
}
Scheduler.preemt
好的,關于預選和優選,我這里不做過多解讀,因為整個源碼邏輯和 1.5 是一樣,不同的是 1.8 增加了更多的 Predicate 和 Priority Policys 及其實現。下面只看搶占式調度 Preempt 的代碼。
plugin/pkg/scheduler/scheduler.go:191
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {if !utilfeature.DefaultFeatureGate.Enabled(features.PodPriority) {glog.V(3).Infof(Pod priority feature is not enabled. No preemption is performed.)
return , nil
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
if err != nil {glog.Errorf( Error getting the updated preemptor pod object: %v , err)
return , err
node, victims, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
if err != nil {glog.Errorf( Error preempting victims to make room for %v/%v. , preemptor.Namespace, preemptor.Name)
return , err
if node == nil {
return , err
glog.Infof(Preempting %d pod(s) on node %v to make room for %v/%v. , len(victims), node.Name, preemptor.Namespace, preemptor.Name)
annotations := map[string]string{core.NominatedNodeAnnotationKey: node.Name}
err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations)
if err != nil {glog.Errorf( Error in preemption process. Cannot update pod %v annotations: %v , preemptor.Name, err)
return , err
for _, victim := range victims {if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {glog.Errorf( Error preempting pod %v/%v: %v , victim.Namespace, victim.Name, err)
return , err
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, Preempted , by %v/%v on node %v , preemptor.Namespace, preemptor.Name, node.Name)
return node.Name, err
}
檢查 FeaturesGate 中是否開啟了 PodPriority,如果沒開啟,則不會進行后續 Preemption 操作;
由于該 Pod 在 Predicate/Priortiy 調度過程失敗后,會更新 PodCondition,記錄調度失敗狀態及失敗原因。因此需要從 apiserver 中獲取 PodCondition 更新后的 Pod Object;
調用 ScheduleAlgorithm.Preempt 進行搶占式調度,選出最佳 node 和待 preempt pods(稱為 victims);
調用 apiserver 給該 pod(稱為 Preemptor)打上 Annotation:NominatedNodeName=nodeName;
遍歷 victims,調用 apiserver 進行逐個刪除這些 pods;
注意:在 scheduler 調用 shed.schedule(pod)進行預選和優選調度失敗時,Pod Bind Node 失敗,該 Pod 會 requeue unscheduled Cache podqueue 中,如果在這個 pod 調度過程中又有新的 pod 加入到待調度隊列,那么該 pod requeue 時它前面就有其他 pod,下一次調度就是先調度在它前面的 pod,而這些 pod 的調度有可能會調度到剛剛通過 Preempt 釋放資源的 Node 上,導致把剛才 Preemptor 釋放的 resource 消耗掉。當再次輪到上次的 Preemptor 調度時,可能又需要觸發一次某個節點的 Preempt。
genericScheduler.Preempt
ScheduleAlgorithm.Preempt 是搶占式調度的關鍵實現,其對應的實現在 genericScheduler 中:
plugin/pkg/scheduler/core/generic_scheduler.go:181
// preempt finds nodes with pods that can be preempted to make room for pod to
// schedule. It chooses one of the nodes and preempts the pods on the node and
// returns the node and the list of preempted pods if such a node is found.
// TODO(bsalamat): Add priority-based scheduling. More info: today one or more
// pending pods (different from the pod that triggered the preemption(s)) may
// schedule into some portion of the resources freed up by the preemption(s)
// before the pod that triggered the preemption(s) has a chance to schedule
// there, thereby preventing the pod that triggered the preemption(s) from
// scheduling. Solution is given at:
// https://github.com/kubernetes/community/blob/master/contributors/design-proposals/pod-preemption.md#preemption-mechanics
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil
err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
return nil, nil, err
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {glog.V(5).Infof(Pod %v is not eligible for more preemption. , pod.Name)
return nil, nil, nil
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, err
if len(allNodes) == 0 {
return nil, nil, ErrNoNodesAvailable
potentialNodes := nodesWherePreemptionMightHelp(pod, allNodes, fitError.FailedPredicates)
if len(potentialNodes) == 0 {glog.V(3).Infof(Preemption will not help schedule pod %v on any node. , pod.Name)
return nil, nil, nil
nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer)
if err != nil {
return nil, nil, err
for len(nodeToPods) 0 {node := pickOneNodeForPreemption(nodeToPods)
if node == nil {
return nil, nil, err
passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders)
if passes pErr == nil {return node, nodeToPods[node], err
if pErr != nil {glog.Errorf( Error occurred while checking extenders for preemption on node %v: %v , node, pErr)
// Remove the node from the map and try to pick a different node.
delete(nodeToPods, node)
return nil, nil, err
}
sched.schedule error 檢查
只有前面 sched.schedule()返回的 error 為 FitError 類型時,才會觸發后續的 Preemption。FitError 就是表示 pod 在 Predicate 階段進行某些 PredicateFunc 篩選時不通過。也就是說只有預選失敗的 Pod 才會進行搶占式調度。
更新 scheduler cache 中的 NodeInfo
更新 scheduler cache 中 NodeInfo,主要是更新 Node 上 scheduled 和 Assumed Pods,作為后續 Preempt Pods 時的考慮范圍,確保 Preemption 是正確的。
podEligibleToPreemptOthers 檢查 pod 是否有資格進行搶占式調度
invoke podEligibleToPreemptOthers 來判斷該 pod 是否適合進行后續的 Preemption,判斷邏輯是:
如果該 Pod 已經包含 Annotation:NominatedNodeName=nodeName(說明該 pod 之前已經 Preempted),并且 Annotation 中的這個 Node 有比該 pod 優先級更低的 pod 正在 Terminating,則認為該 pod 不適合進行后續的 Preemption,流程結束。
除此之外,繼續后續的流程。
對應代碼如下:
plugin/pkg/scheduler/core/generic_scheduler.go:756
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found {if nodeInfo, found := nodeNameToInfo[nodeName]; found {for _, p := range nodeInfo.Pods() {if p.DeletionTimestamp != nil util.GetPodPriority(p) util.GetPodPriority(pod) {
// There is a terminating pod on the nominated node.
return false
return true
}
nodesWherePreemptionMightHelp 篩選出 Potential Nodes
invoke nodesWherePreemptionMightHelp 來獲取 potential nodes。nodesWherePreemptionMightHelp 的邏輯是:
NodeSelectorNotMatch,
PodNotMatchHostName,
TaintsTolerationsNotMatch,
NodeLabelPresenceViolated,
NodeNotReady,
NodeNetworkUnavailable,
NodeUnschedulable,
NodeUnknownCondition
遍歷所有的 nodes,對每個 nodes 在 sched.schedule()在預選階段失敗的 Predicate 策略 (failedPredicates) 進行掃描,如果 failedPredicates 包含以下 Policy,則說明該 node 不適合作為 Preempt 的備選節點。
除此之外的 Node 均作為 Potential Nodes。
對應代碼如下:
func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {potentialNodes := []*v1.Node{}
for _, node := range nodes {
unresolvableReasonExist := false
failedPredicates, found := failedPredicatesMap[node.Name]
// If we assume that scheduler looks at all nodes and populates the failedPredicateMap
// (which is the case today), the !found case should never happen, but we d prefer
// to rely less on such assumptions in the code when checking does not impose
// significant overhead.
for _, failedPredicate := range failedPredicates {
switch failedPredicate {
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrTaintsTolerationsNotMatch,
predicates.ErrNodeLabelPresenceViolated,
predicates.ErrNodeNotReady,
predicates.ErrNodeNetworkUnavailable,
predicates.ErrNodeUnschedulable,
predicates.ErrNodeUnknownCondition:
unresolvableReasonExist = true
break
// TODO(bsalamat): Please add affinity failure cases once we have specific affinity failure errors.
if !found || !unresolvableReasonExist {glog.V(3).Infof(Node %v is a potential node for preemption. , node.Name)
potentialNodes = append(potentialNodes, node)
return potentialNodes
}
selectNodesForPreemption 和 selectVictimsOnNode 選出可行 Nodes 及其對應的 victims
invoke selectNodesForPreemption 從 Potential Nodes 中找出所有可行的 Nodes 及對應的 victim Pods,其對應的邏輯如為:啟動 max(16, potentialNodesNum)個 worker(對應 goruntine)通過 WaitGroups 并發等待所有 node 的 check 完成:
遍歷該 node 上所有的 scheduled pods(包括 assumed pods),將優先級比 Preemptor 更低的 Pods 都加入到 Potential victims List 中,并且將這些 victims 從 NodeInfoCopy 中刪除,下次進行 Predicate 時就意味著 Node 上有更多資源可用。
對 Potential victims 中元素進行排序,排序規則是按照優先級從高到底排序的,index 為 0 的對應的優先級最高。
檢查 Preemptor 是否能 scheduler 配置的所有 Predicates Policy(基于前面將這些 victims 從 NodeInfoCopy 中刪除,將所有更低優先級的 pods 資源全部釋放了),如果不通過則返回,表示該 node 不合適。All Predicate 通過后,繼續下面流程。
遍歷所有的 Potential victims list item(已經按照優先級從高到底排序),試著把 Potential victims 中第一個 Pod(優先級最高)加回到 NodeInfoCopy 中,再檢查 Preemptor 是否能 scheduler 配置的所有 Predicates Policy,如果不滿足就把該 pod 再從 NodeInfoCopy 中刪除,并且正式加入到 victims list 中。接著對 Potential victims 中第 2,3… 個 Pod 進行同樣處理。這樣做,是為了保證盡量保留優先級更高的 Pods,盡量刪除更少的 Pods。
最終返回每個可行 node 及其對應 victims list。
selectNodesForPreemption 代碼如下,其實核心代碼在 selectVictimsOnNode。
plugin/pkg/scheduler/core/generic_scheduler.go:583
func selectNodesForPreemption(pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
potentialNodes []*v1.Node,
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer,
) (map[*v1.Node][]*v1.Pod, error) {nodeNameToPods := map[*v1.Node][]*v1.Pod{}
var resultLock sync.Mutex
// We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {nodeName := potentialNodes[i].Name
var metaCopy algorithm.PredicateMetadata
if meta != nil {metaCopy = meta.ShallowCopy()
pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates)
if fits {resultLock.Lock()
nodeNameToPods[potentialNodes[i]] = pods
resultLock.Unlock()
workqueue.Parallelize(16, len(potentialNodes), checkNode)
return nodeNameToPods, nil
}
plugin/pkg/scheduler/core/generic_scheduler.go:659
func selectVictimsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) {potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone()
removePod := func(rp *v1.Pod) {nodeInfoCopy.RemovePod(rp)
if meta != nil {meta.RemovePod(rp)
addPod := func(ap *v1.Pod) {nodeInfoCopy.AddPod(ap)
if meta != nil {meta.AddPod(ap, nodeInfoCopy)
// As the first step, remove all the lower priority pods from the node and
// check if the given pod can be scheduled.
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {if util.GetPodPriority(p) podPriority {potentialVictims.Items = append(potentialVictims.Items, p)
removePod(p)
potentialVictims.Sort()
// If the new pod does not fit after removing all the lower priority pods,
// we are almost done and this node is not suitable for preemption. The only condition
// that we should check is if the pod is failing to schedule due to pod affinity
// failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
if err != nil {glog.Warningf( Encountered error while selecting victims on node %v: %v , nodeInfo.Node().Name, err)
return nil, false
victims := []*v1.Pod{}
// Try to reprieve as many pods as possible starting from the highest priority one.
for _, p := range potentialVictims.Items {lpp := p.(*v1.Pod)
addPod(lpp)
if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {removePod(lpp)
victims = append(victims, lpp)
glog.V(5).Infof(Pod %v is a potential preemption victim on node %v. , lpp.Name, nodeInfo.Node().Name)
return victims, true
}
pickOneNodeForPreemption 從可行 Nodes 中找出最合適的一個 Node
如果上一步至少找到一個可行 node,則調用 pickOneNodeForPreemption 按照以下邏輯選擇一個最合適的 node:
選擇 victims 中最高 pod 優先級最低的那個 Node。
如果上一步有不止一個 Nodes 滿足條件,則再對選擇所有 victims 優先級之和最小的那個 Node。
如果上一步有不止一個 Nodes 滿足條件,則再選擇 victims pod 數最少的 Node。
如果上一步有不止一個 Nodes 滿足條件,則再隨機選擇一個 Node。
以上每一步的 Nodes 列表,都是基于上一步篩選后的 Nodes。
plugin/pkg/scheduler/core/generic_scheduler.go:501
func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node {
type nodeScore struct {
node *v1.Node
highestPriority int32
sumPriorities int64
numPods int
if len(nodesToPods) == 0 {
return nil
minHighestPriority := int32(math.MaxInt32)
minPriorityScores := []*nodeScore{}
for node, pods := range nodesToPods {if len(pods) == 0 {
// We found a node that doesn t need any preemption. Return it!
// This should happen rarely when one or more pods are terminated between
// the time that scheduler tries to schedule the pod and the time that
// preemption logic tries to find nodes for preemption.
return node
// highestPodPriority is the highest priority among the victims on this node.
highestPodPriority := util.GetPodPriority(pods[0])
if highestPodPriority minHighestPriority {
minHighestPriority = highestPodPriority
minPriorityScores = nil
if highestPodPriority == minHighestPriority {minPriorityScores = append(minPriorityScores, nodeScore{node: node, highestPriority: highestPodPriority, numPods: len(pods)})
if len(minPriorityScores) == 1 {return minPriorityScores[0].node
// There are a few nodes with minimum highest priority victim. Find the
// smallest sum of priorities.
minSumPriorities := int64(math.MaxInt64)
minSumPriorityScores := []*nodeScore{}
for _, nodeScore := range minPriorityScores {
var sumPriorities int64
for _, pod := range nodesToPods[nodeScore.node] {
// We add MaxInt32+1 to all priorities to make all of them = 0. This is
// needed so that a node with a few pods with negative priority is not
// picked over a node with a smaller number of pods with the same negative
// priority (and similar scenarios).
sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
if sumPriorities minSumPriorities {
minSumPriorities = sumPriorities
minSumPriorityScores = nil
nodeScore.sumPriorities = sumPriorities
if sumPriorities == minSumPriorities {minSumPriorityScores = append(minSumPriorityScores, nodeScore)
if len(minSumPriorityScores) == 1 {return minSumPriorityScores[0].node
// There are a few nodes with minimum highest priority victim and sum of priorities.
// Find one with the minimum number of pods.
minNumPods := math.MaxInt32
minNumPodScores := []*nodeScore{}
for _, nodeScore := range minSumPriorityScores {
if nodeScore.numPods minNumPods {
minNumPods = nodeScore.numPods
minNumPodScores = nil
if nodeScore.numPods == minNumPods {minNumPodScores = append(minNumPodScores, nodeScore)
// At this point, even if there are more than one node with the same score,
// return the first one.
if len(minNumPodScores) 0 {return minNumPodScores[0].node
glog.Errorf(Error in logic of node scoring for preemption. We should never reach here!)
return nil
}
最合適的 Node 仍然要交給 extender(if configed)檢查
如果 scheduler 配置 extender scheduler,則還需要通過 invoke nodePassesExtendersForPreemption 再次將該 pod 和(假設)剔除 victims 的該 node 交給 extender.Filter 進行一下檢查,只有檢查通過了才返回該 node 作為最終選擇的 Preempt node。
關于 extender 的理解,請參考如何對 kubernetes scheduler 進行二次開發和 Kubernetes Scheduler 源碼分析。其實用的場景不多,現在支持自定義調度器了,就更少需要使用 scheduler extender 了。
感謝各位的閱讀,以上就是“Preemption 搶占式調度的方法是什么”的內容了,經過本文的學習后,相信大家對 Preemption 搶占式調度的方法是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!