久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何進行main方法與Leader選舉分析

169次閱讀
沒有評論

共計 15660 個字符,預計需要花費 40 分鐘才能閱讀完成。

如何進行 main 方法與 Leader 選舉分析,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

main 方法與 Leader 選舉分析 1.main 方法分析

主要對 main 方法的主要邏輯進行分析,以及分析下組件的 EventHandler,看該組件 list/watch 哪些對象,對象事件來了怎么處理,以及 claimQueue 與 volumeQueue 的對象來源。

main 方法主要邏輯分析

main 方法主要邏輯:
(1)解析啟動參數;
(2)根據配置建立 clientset;
(3)建立 grpcclient;
(4)進行 grpc 探測(探測 cephcsi-rbd 服務是否準備好),直至探測成功;
(5)通過 grpc 獲取 driver 名稱與能力;
(6)根據 clientset 建立 informers;
(7)構建 provisionController 對象;
(8)定義 run 方法(包括了 provisionController.Run);
(9)根據 –enable-leader-election 組件啟動參數配置決定是否開啟 Leader 選舉,當不開啟時,直接運行 run 方法,開啟時調用 le.Run()。

func main() {
 var config *rest.Config
 var err error
 flag.Var(utilflag.NewMapStringBool( featureGates),  feature-gates ,  A set of key=value pairs that describe feature gates for alpha/experimental features.  +
 Options are:\n +strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(),  \n ))
 klog.InitFlags(nil)
 flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
 flag.Set(logtostderr ,  true)
 flag.Parse()
 if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil {klog.Fatal(err)
 if *showVersion {fmt.Println(os.Args[0], version)
 os.Exit(0)
 klog.Infof(Version: %s , version)
 // get the KUBECONFIG from env if specified (useful for local/debug cluster)
 kubeconfigEnv := os.Getenv(KUBECONFIG)
 if kubeconfigEnv !=   {klog.Infof( Found KUBECONFIG environment variable set, using that..)
 kubeconfig =  kubeconfigEnv
 if *master !=   || *kubeconfig !=   {klog.Infof( Either master or kubeconfig specified. building kube config from that..)
 config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
 } else {klog.Infof( Building kube configs for running in cluster...)
 config, err = rest.InClusterConfig()
 if err != nil {klog.Fatalf( Failed to create config: %v , err)
 clientset, err := kubernetes.NewForConfig(config)
 if err != nil {klog.Fatalf( Failed to create client: %v , err)
 // snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1beta1Client
 snapClient, err := snapclientset.NewForConfig(config)
 if err != nil {klog.Fatalf( Failed to create snapshot client: %v , err)
 // The controller needs to know what the server version is because out-of-tree
 // provisioners aren t officially supported until 1.5
 serverVersion, err := clientset.Discovery().ServerVersion()
 if err != nil {klog.Fatalf( Error getting server version: %v , err)
 metricsManager := metrics.NewCSIMetricsManager( /* driverName */)
 grpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager)
 if err != nil {klog.Error(err.Error())
 os.Exit(1)
 } 
 //  循環探測,直至 CSI driver 即 cephcsi-rbd 服務準備好
 err = ctrl.Probe(grpcClient, *operationTimeout)
 if err != nil {klog.Error(err.Error())
 os.Exit(1)
 //  從 ceph-csi 組件中獲取 driver name
 provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout)
 if err != nil {klog.Fatalf( Error getting CSI driver name: %s , err)
 klog.V(2).Infof(Detected CSI driver %s , provisionerName)
 metricsManager.SetDriverName(provisionerName)
 metricsManager.StartMetricsEndpoint(*metricsAddress, *metricsPath)
 
 //  從 ceph-csi 組件中獲取 driver 能力
 pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)
 if err != nil {klog.Fatalf( Error getting CSI driver capabilities: %s , err)
 // Generate a unique ID for this provisioner
 timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
 identity := strconv.FormatInt(timeStamp, 10) +  -  + strconv.Itoa(rand.Intn(10000)) +  -  + provisionerName
 
 //  開始構建 infomer
 factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
 // -------------------------------
 // Listers
 // Create informer to prevent hit the API server for all resource request
 scLister := factory.Storage().V1().StorageClasses().Lister()
 claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()
 var csiNodeLister storagelistersv1beta1.CSINodeLister
 var nodeLister v1.NodeLister
 if ctrl.SupportsTopology(pluginCapabilities) {csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
 nodeLister = factory.Core().V1().Nodes().Lister()
 // -------------------------------
 // PersistentVolumeClaims informer
 rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
 claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter,  claims)
 claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()
 // Setup options
 provisionerOptions := []func(*controller.ProvisionController) error{controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.
 controller.FailedProvisionThreshold(0),
 controller.FailedDeleteThreshold(0),
 controller.RateLimiter(rateLimiter),
 controller.Threadiness(int(*workerThreads)),
 controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),
 controller.ClaimsInformer(claimInformer),
 translator := csitrans.New()
 supportsMigrationFromInTreePluginName :=  
 if translator.IsMigratedCSIDriverByName(provisionerName) {supportsMigrationFromInTreePluginName, err = translator.GetInTreeNameFromCSIName(provisionerName)
 if err != nil {klog.Fatalf( Failed to get InTree plugin name for migrated CSI plugin %s: %v , provisionerName, err)
 klog.V(2).Infof(Supports migration from in-tree plugin: %s , supportsMigrationFromInTreePluginName)
 provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
 // Create the provisioner: it implements the Provisioner interface expected by
 // the controller
 csiProvisioner := ctrl.NewCSIProvisioner(
 clientset,
 *operationTimeout,
 identity,
 *volumeNamePrefix,
 *volumeNameUUIDLength,
 grpcClient,
 snapClient,
 provisionerName,
 pluginCapabilities,
 controllerCapabilities,
 supportsMigrationFromInTreePluginName,
 *strictTopology,
 translator,
 scLister,
 csiNodeLister,
 nodeLister,
 claimLister,
 *extraCreateMetadata,
 provisionController = controller.NewProvisionController(
 clientset,
 provisionerName,
 csiProvisioner,
 serverVersion.GitVersion,
 provisionerOptions...,
 csiClaimController := ctrl.NewCloningProtectionController(
 clientset,
 claimLister,
 claimInformer,
 claimQueue,
 
 //  主循環函數
 run := func(context.Context) {stopCh := context.Background().Done()
 factory.Start(stopCh)
 cacheSyncResult := factory.WaitForCacheSync(stopCh)
 for _, v := range cacheSyncResult {
 if !v {klog.Fatalf( Failed to sync Informers!)
 
 //  跑兩個 controller,后面主要分析 provisionController
 go csiClaimController.Run(int(*finalizerThreads), stopCh)
 provisionController.Run(wait.NeverStop)
 
 // Leader  選舉相關
 if !*enableLeaderElection {run(context.TODO())
 } else {
 // this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/v5/controller
 // to preserve backwards compatibility
 lockName := strings.Replace(provisionerName,  / ,  - , -1)
 
 //  使用 endpoints 或 leases 資源對象來選 leader
 var le leaderElection
 if *leaderElectionType ==  endpoints  {klog.Warning( The  endpoints  leader election type is deprecated and will be removed in a future release. Use  --leader-election-type=leases  instead.)
 le = leaderelection.NewLeaderElectionWithEndpoints(clientset, lockName, run)
 } else if *leaderElectionType ==  leases  {le = leaderelection.NewLeaderElection(clientset, lockName, run)
 } else {klog.Error( --leader-election-type must be either  endpoints  or  leases)
 os.Exit(1)
 if *leaderElectionNamespace !=   {le.WithNamespace(*leaderElectionNamespace)
 
 //  處理 Leader  選舉邏輯的方法
 if err := le.Run(); err != nil {klog.Fatalf( failed to initialize leader election: %v , err)
}

controller.NewProvisionController

主要看到 EventHandler,定義了該組件 list/watch 的對象,對象事件來了怎么處理,以及 claimQueue 與 volumeQueue 的對象來源。

claimHandler

可以看到,claimQueue 的來源是 pvc 對象的新增、更新事件(對 claimQueue 的處理已在 external-provisioner 源碼分析(1)- 主體處理邏輯分析中講過,忘了的話可以回頭看下)。

 ...
 // PersistentVolumeClaims
 claimHandler := cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) { controller.enqueueClaim(obj) },
 UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) },
 DeleteFunc: func(obj interface{}) {
 // NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual
 // or it s not in claimsInProgress and then we don t care
 if controller.claimInformer != nil {controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
 } else {controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer()
 controller.claimInformer.AddEventHandler(claimHandler)
 ...
// enqueueClaim takes an obj and converts it into UID that is then put onto claim work queue.
func (ctrl *ProvisionController) enqueueClaim(obj interface{}) {uid, err := getObjectUID(obj)
 if err != nil {utilruntime.HandleError(err)
 return
 if ctrl.claimQueue.NumRequeues(uid) == 0 {ctrl.claimQueue.Add(uid)
}

volumeHandler

可以看到,volumeQueue 的來源是 pv 對象的新增、更新事件(對 volumeQueue 的處理已在 external-provisioner 源碼分析(1)- 主體處理邏輯分析中講過,忘了的話可以回頭看下)。

 ...
 // PersistentVolumes
 volumeHandler := cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) { controller.enqueueVolume(obj) },
 UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueVolume(newObj) },
 DeleteFunc: func(obj interface{}) { controller.forgetVolume(obj) },
 if controller.volumeInformer != nil {controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)
 } else {controller.volumeInformer = informer.Core().V1().PersistentVolumes().Informer()
 controller.volumeInformer.AddEventHandler(volumeHandler)
 ...
// enqueueVolume takes an obj and converts it into a namespace/name string which
// is then put onto the given work queue.
func (ctrl *ProvisionController) enqueueVolume(obj interface{}) {
 var key string
 var err error
 if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {utilruntime.HandleError(err)
 return
 // Re-Adding is harmless but try to add it to the queue only if it is not
 // already there, because if it is already there we *must* be retrying it
 if ctrl.volumeQueue.NumRequeues(key) == 0 {ctrl.volumeQueue.Add(key)
}
// forgetVolume Forgets an obj from the given work queue, telling the queue to
// stop tracking its retries because e.g. the obj was deleted
func (ctrl *ProvisionController) forgetVolume(obj interface{}) {
 var key string
 var err error
 if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {utilruntime.HandleError(err)
 return
 ctrl.volumeQueue.Forget(key)
 ctrl.volumeQueue.Done(key)
}

2.Leader 選舉分析

在 Golang 中,k8s client-go 這個 package 針對 Leader 相關功能進行了封裝,支持 3 種鎖資源,endpoint,configmap,lease,方便使用。

Leader 選舉基本原理

Leader 選舉基本原理其實就是利用通過 Kubernetes 中 configmap,endpoints 或者 lease 資源實現一個分布式鎖,搶 (acqure) 到鎖的節點成為 leader,并且定期更新(renew)。其他進程也在不斷的嘗試進行搶占,搶占不到則繼續等待下次循環。當 leader 節點掛掉之后,租約到期,其他節點就成為新的 leader。

搶到鎖其實就是成功把該進程的相關信息(如進程唯一標識)寫入 configmap、endpoints 或者 lease 資源對象中;而定期更新其實就是定期更新該資源的鎖更新時間,以延續租期。

多個進程同時獲取鎖(更新鎖資源)時,由 apiserver 來保證鎖資源 update 的原子操作,通過對比 resourceVersion 版本號(resourceVersion 的取值最終來源于 etcd 的 modifiedindex),保證只有一個進程能修改成功,也即只有一個進程能成功獲取到鎖。

鎖示例如下:

apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
 creationTimestamp:  2020-08-21T11:56:46Z 
 name: rbd-csi-ceph-com
 namespace: default
 resourceVersion:  69642798 
 selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/rbd-csi-ceph-com
 uid: c9a7ea00-c000-4c5c-b90f-d0e7c85240ca
spec:
 acquireTime:  2020-08-21T11:56:46.907075Z 
 holderIdentity: cld-dnode3-1091-i-nease-net
 leaseDurationSeconds: 15
 leaseTransitions: 0
 renewTime:  2020-09-07T02:38:24.587170Z

其中 holderIdentity 記錄了獲取到鎖的進程信息,renewTime 記錄了鎖更新時間。

external-provisioner 的 Leader 選舉

從 main 方法代碼中可以看出,在 external-provisioner 組件中,僅支持 endpoint 與 lease 兩種鎖資源,且 endpoints 鎖會在后續被棄用,所以建議使用 lease 鎖。

external-provisioner 組件的高可用選主邏輯與 k8s 中的 kube-controller-manager、kube-scheduler 等組件的高可用選主邏輯類似。

概要過程:
(1)組件啟動時,定期循環的去獲取 lease 鎖,獲取成功則成為 leader 且返回,否則一直阻塞;
(2)獲取 lease 鎖成功,則競選 leader 成功,然后運行 external-provisioner 組件的主體處理邏輯;
(3)競選 leader 成功后,繼續定期循環續約,以保證 leader 的長久性。

下面進行代碼的分析。

le.Run()

當 –enable-leader-election 組件啟動參數為 true 時,運行該方法,主要邏輯為:
(1)定義 leaderConfig 結構體;
(2)調用 leaderelection.RunOrDie 做進一步的選主邏輯處理。

func (l *leaderElection) Run() error {
 if l.identity ==   {id, err := defaultLeaderElectionIdentity()
 if err != nil {return fmt.Errorf( error getting the default leader identity: %v , err)
 l.identity = id
 if l.namespace ==   {l.namespace = inClusterNamespace()
 broadcaster := record.NewBroadcaster()
 broadcaster.StartRecordingToSink(corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)})
 eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf( %s/%s , l.lockName, string(l.identity))})
 rlConfig := resourcelock.ResourceLockConfig{Identity: sanitizeName(l.identity),
 EventRecorder: eventRecorder,
 lock, err := resourcelock.New(l.resourceLock, l.namespace, sanitizeName(l.lockName), l.clientset.CoreV1(), l.clientset.CoordinationV1(), rlConfig)
 if err != nil {
 return err
 leaderConfig := leaderelection.LeaderElectionConfig{
 Lock: lock,
 LeaseDuration: l.leaseDuration,
 RenewDeadline: l.renewDeadline,
 RetryPeriod: l.retryPeriod,
 Callbacks: leaderelection.LeaderCallbacks{OnStartedLeading: func(ctx context.Context) {klog.V(2).Info(became leader, starting)
 l.runFunc(ctx)
 OnStoppedLeading: func() {klog.Fatal( stopped leading)
 OnNewLeader: func(identity string) {klog.V(3).Infof(new leader detected, current leader: %s , identity)
 leaderelection.RunOrDie(context.TODO(), leaderConfig)
 return nil // should never reach here
}

leaderelection.RunOrDie()

主要邏輯:
(1)調用 le.acquire()方法來嘗試競選為 leader(acquire 方法會定期循環的去獲取 lease 鎖,獲取成功則成為 leader 且返回,否則一直阻塞);
(2)競選 leader 成功,運行 run 方法;
(3)調用 le.renew()續約方法,定期循環續約。

// RunOrDie starts a client with the provided config or panics if the config
// fails to validate.
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {le, err := NewLeaderElector(lec)
 if err != nil {panic(err)
 if lec.WatchDog != nil {lec.WatchDog.SetLeaderElection(le)
 le.Run(ctx)
// Run starts the leader election loop
func (le *LeaderElector) Run(ctx context.Context) {defer func() {runtime.HandleCrash()
 le.config.Callbacks.OnStoppedLeading()
 if !le.acquire(ctx) {
 return // ctx signalled done
 ctx, cancel := context.WithCancel(ctx)
 defer cancel()
 go le.config.Callbacks.OnStartedLeading(ctx)
 le.renew(ctx)
// acquire 會不斷循環的去獲取 lease 鎖,獲取成功則成為 leader 且返回
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if ctx signals done.
func (le *LeaderElector) acquire(ctx context.Context) bool {ctx, cancel := context.WithCancel(ctx)
 defer cancel()
 succeeded := false
 desc := le.config.Lock.Describe()
 klog.Infof(attempting to acquire leader lease %v... , desc)
 wait.JitterUntil(func() {succeeded = le.tryAcquireOrRenew()
 le.maybeReportTransition()
 if !succeeded {klog.V(4).Infof(failed to acquire lease %v , desc)
 return
 le.config.Lock.RecordEvent(became leader)
 le.metrics.leaderOn(le.config.Name)
 klog.Infof(successfully acquired lease %v , desc)
 cancel()}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
 return succeeded
//  續約方法,不斷循環續約
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(ctx context.Context) {ctx, cancel := context.WithCancel(ctx)
 defer cancel()
 wait.Until(func() {timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
 defer timeoutCancel()
 err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {done := make(chan bool, 1)
 go func() {defer close(done)
 done  - le.tryAcquireOrRenew()
 select {case  -timeoutCtx.Done():
 return false, fmt.Errorf(failed to tryAcquireOrRenew %s , timeoutCtx.Err())
 case result :=  -done:
 return result, nil
 }, timeoutCtx.Done())
 le.maybeReportTransition()
 desc := le.config.Lock.Describe()
 if err == nil {klog.V(5).Infof(successfully renewed lease %v , desc)
 return
 le.config.Lock.RecordEvent(stopped leading)
 le.metrics.leaderOff(le.config.Name)
 klog.Infof(failed to renew lease %v: %v , desc, err)
 cancel()}, le.config.RetryPeriod, ctx.Done())
 // if we hold the lease, give it up
 if le.config.ReleaseOnCancel {le.release()
}

看完上述內容,你們掌握如何進行 main 方法與 Leader 選舉分析的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注丸趣 TV 行業資訊頻道,感謝各位的閱讀!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-04發表,共計15660字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 理塘县| 三亚市| 房山区| 虹口区| 台前县| 陵水| 会宁县| 汪清县| 庆安县| 阳泉市| 富源县| 绩溪县| 宜兰县| 察隅县| 靖安县| 临汾市| 体育| 西安市| 彰化市| 启东市| 岑溪市| 化隆| 汝城县| 八宿县| 张北县| 霍邱县| 长乐市| 瑞昌市| 长宁县| 平利县| 井陉县| 邯郸市| 西丰县| 曲阳县| 墨江| 西宁市| 三亚市| 海兴县| 旬邑县| 湘潭市| 江山市|