久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何解析client

157次閱讀
沒有評論

共計 7833 個字符,預計需要花費 20 分鐘才能閱讀完成。

今天就跟大家聊聊有關如何解析 client-go 中 workqueue,可能很多人都不太了解,為了讓大家更加了解,丸趣 TV 小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

下面主要講述下 client-go 中 workqueue, 看一下 client-go 的一個整體數據走向. 如下圖:

而 workqueue 主要是在 listener 這里引用,listener 使用 chan 獲取到數據之后將數據放入到工作隊列進行處理。主要是由于 chan 過于簡單,已經無法滿足 K8S 的場景,所以衍生出了 workqueue,

特性

有序

去重

并發

延遲處理

限速

當前有三種 workqueue

基本隊列

延遲隊列

限速隊列

其中延遲隊列是基于基本隊列實現的,而限流隊列基于延遲隊列實現

基本隊列

看一下基本隊列的接口

// client-go 源碼路徑 util/workqueue/queue.go
type Interface interface {
 // 新增元素   可以是任意對象
 Add(item interface{})
 // 獲取當前隊列的長度
 Len() int
 //  阻塞獲取頭部元素 (先入先出)  返回元素以及隊列是否關閉
 Get() (item interface{}, shutdown bool)
 //  顯示標記完成元素的處理
 Done(item interface{})
 // 關閉隊列
 ShutDown()
 // 隊列是否處于關閉狀態
 ShuttingDown() bool}

看一下基本隊列的數據結構, 只看三個重點處理的, 其他的沒有展示出來

type Type struct {
 // 含有所有元素的元素的隊列   保證有序
 queue []t
 // 所有需要處理的元素  set 是基于 map 以 value 為空 struct 實現的結構,保證去重
 dirty set
 // 當前正在處理中的元素
 processing set
 ...
type empty struct{}
type t interface{}
type set map[t]empty

基本隊列的 hello world 也很簡單

 wq := workqueue.New()
 wq.Add(hello)
 v, _ := wq.Get()

基本隊列 Add

func (q *Type) Add(item interface{}) { q.cond.L.Lock()
 defer q.cond.L.Unlock()
 // 如果當前處于關閉狀態, 則不再新增元素
 if q.shuttingDown {
 return
 }
 // 如果元素已經在等待處理中, 則不再新增
 if q.dirty.has(item) {
 return
 }
 // 添加到 metrics
 q.metrics.add(item)
 // 加入等待處理中
 q.dirty.insert(item)
 // 如果目前正在處理該元素   就不將元素添加到隊列
 if q.processing.has(item) {
 return
 }
 q.queue = append(q.queue, item)
 q.cond.Signal()}

基本隊列 Get

func (q *Type) Get() (item interface{}, shutdown bool) { q.cond.L.Lock()
 defer q.cond.L.Unlock()
 // 如果當前沒有元素并且不處于關閉狀態, 則阻塞
 for len(q.queue) == 0   !q.shuttingDown { q.cond.Wait()
 }
 ...
 item, q.queue = q.queue[0], q.queue[1:]
 q.metrics.get(item)
 // 把元素添加到正在處理隊列中
 q.processing.insert(item)
 // 把隊列從等待處理隊列中刪除
 q.dirty.delete(item)
 return item, false
}

基本隊列實例化

func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
 t :=  Type{
 clock: c,
 dirty: set{},
 processing: set{},
 cond: sync.NewCond(sync.Mutex{}),
 metrics: metrics,
 unfinishedWorkUpdatePeriod: updatePeriod,
 }
 // 啟動一個協程   定時更新 metrics
 go t.updateUnfinishedWorkLoop()
 return t
func (q *Type) updateUnfinishedWorkLoop() { t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
 defer t.Stop()
 for range t.C() { if !func() bool { q.cond.L.Lock()
 defer q.cond.L.Unlock()
 if !q.shuttingDown { q.metrics.updateUnfinishedWork()
 return true
 }
 return false
 }() {
 return
 }
 }
}

延遲隊列

延遲隊列的實現思路主要是使用優先隊列存放需要延遲添加的元素, 每次判斷最小延遲的元素書否已經達到了加入隊列的要求 (延遲的時間到了), 如果是則判斷下一個元素, 直到沒有元素或者元素還需要延遲為止。

看一下延遲隊列的數據結構

type delayingType struct {
 Interface
 ...
 // 放置延遲添加的元素
 waitingForAddCh chan *waitFor
 ...
}

主要是使用 chan 來保存延遲添加的元素, 而具體實現是通過一個實現了一個 AddAfter 方法,看一下具體的內容

// 延遲隊列的接口
type DelayingInterface interface {
 Interface
 // AddAfter adds an item to the workqueue after the indicated duration has passed
 AddAfter(item interface{}, duration time.Duration)
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
 ...
 // 如果延遲實現小于等于 0   直接添加到隊列
 if duration  = 0 { q.Add(item)
 return
 }
 select {
 case  -q.stopCh:
 // 添加到 chan, 下面會講一下這個 chan 的處理
 case q.waitingForAddCh  -  waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
 }
}

延遲元素的處理

func (q *delayingType) waitingLoop() { defer utilruntime.HandleCrash()
 never := make(-chan time.Time)
 var nextReadyAtTimer clock.Timer
 waitingForQueue :=  waitForPriorityQueue{}
 // 這里是初始化一個優先隊列   具體實現有興趣的同學可以研究下
 heap.Init(waitingForQueue)
 waitingEntryByData := map[t]*waitFor{}
 for { if q.Interface.ShuttingDown() {
 return
 }
 now := q.clock.Now()
 // Add ready entries
 for waitingForQueue.Len()   0 { entry := waitingForQueue.Peek().(*waitFor)
 // 看一下第一個元素是否已經到達延遲的時間了
 if entry.readyAt.After(now) {
 break
 }
 // 時間到了, 將元素添加到工作的隊列, 并且從延遲的元素中移除
 entry = heap.Pop(waitingForQueue).(*waitFor)
 q.Add(entry.data)
 delete(waitingEntryByData, entry.data)
 }
 // Set up a wait for the first item s readyAt (if one exists)
 nextReadyAt := never
 if waitingForQueue.Len()   0 {
 if nextReadyAtTimer != nil { nextReadyAtTimer.Stop()
 }
 // 如果還有需要延遲的元素, 計算第一個元素的延遲時間 (最小延遲的元素)
 entry := waitingForQueue.Peek().(*waitFor)
 nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
 nextReadyAt = nextReadyAtTimer.C()
 }
 select {
 case  -q.stopCh:
 return
 case  -q.heartbeat.C():
 // 定時檢查下是否有元素達到延遲的時間
 case  -nextReadyAt:
 // 這里是上面計算出來的時間, 時間到了, 處理到達延遲時間的元素
 case waitEntry :=  -q.waitingForAddCh:
 // 檢查是否需要延遲, 如果需要延遲就加入到延遲等待
 if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry)
 } else {
 // 如果不需要延遲就直接添加到隊列
 q.Add(waitEntry.data)
 }
 drained := false
 for !drained {
 select { case waitEntry :=  -q.waitingForAddCh:

上面 waitingLoop 是在實例化延遲隊列的時候調用的,看一下實例化時候的邏輯

func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
 // 實例化一個數據結構
 ret :=  delayingType{ Interface: NewNamed(name),
 clock: clock,
 heartbeat: clock.NewTicker(maxWait),
 stopCh: make(chan struct{}),
 waitingForAddCh: make(chan *waitFor, 1000),
 metrics: newRetryMetrics(name),
 }
 // 放到一個協程中處理延遲元素
 go ret.waitingLoop()
 return ret
}

限速隊列

當前限速隊列支持 4 中限速模式

令牌桶算法限速

排隊指數限速

計數器模式

混合模式 (多種限速算法同時使用)

限速隊列的底層實際上還是通過延遲隊列來進行限速, 通過計算出元素的限速時間作為延遲時間

來看一下限速接口

type RateLimiter interface {
 //
 When(item interface{}) time.Duration
 // Forget indicates that an item is finished being retried. Doesn t matter whether its for perm failing
 // or for success, we ll stop tracking it
 Forget(item interface{})
 // NumRequeues returns back how many failures the item has had
 NumRequeues(item interface{}) int
}

看一下限速隊列的數據結構

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
 DelayingInterface
 // 實際上底層還是調用的延遲隊列, 通過計算出元素的延遲時間   進行限速
 AddRateLimited(item interface{})
 // Forget indicates that an item is finished being retried. Doesn t matter whether it s for perm failing
 // or for success, we ll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
 // still have to call `Done` on the queue.
 Forget(item interface{})
 // NumRequeues returns back how many times the item was requeued
 NumRequeues(item interface{}) int
func (q *rateLimitingType) AddRateLimited(item interface{}) {
 // 通過 when 方法計算延遲加入隊列的時間
 q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

令牌桶算法

client-go 中的令牌桶限速是通過 golang.org/x/time/rat 包來實現的

可以通過 flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int) 來使用令牌桶限速算法,其中第一個參數 qps 表示每秒補充多少 token,burst 表示總 token 上限為多少。

排隊指數算法

排隊指數可以通過 workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) 來使用。

這個算法有兩個參數:

baseDelay 基礎限速時間

maxDelay 最大限速時間

舉個例子來理解一下這個算法,例如快速插入 5 個相同元素,baseDelay 設置為 1 秒,maxDelay 設置為 10 秒,都在同一個限速期內。第一個元素會在 1 秒后加入到隊列,第二個元素會在 2 秒后加入到隊列,第三個元素會在 4 秒后加入到隊列,第四個元素會在 8 秒后加入到隊列,第五個元素會在 10 秒后加入到隊列 (指數計算的結果為 16,但是最大值設置了 10 秒)。

來看一下源碼的計算

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock()
 defer r.failuresLock.Unlock()
 // 第一次為 0
 exp := r.failures[item]
 // 累加 1
 r.failures[item] = r.failures[item] + 1
 // 通過當前計數和 baseDelay 計算指數結果  baseDelay*(2 的 exp 次方)
 backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
 if backoff   math.MaxInt64 {
 return r.maxDelay
 }
 calculated := time.Duration(backoff)
 if calculated   r.maxDelay {
 return r.maxDelay
 }
 return calculated
}

計數器模式

計數器模式可以通過 workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) 來使用,有三個參數

fastDelay 快限速時間

slowDelay 慢限速時間

maxFastAttempts 快限速元素個數

原理是這樣的,假設 fastDelay 設置為 1 秒,slowDelay 設置為 10 秒,maxFastAttempts 設置為 3,同樣在一個限速周期內快速插入 5 個相同的元素。前三個元素都是以 1 秒的限速時間加入到隊列,添加第四個元素時開始使用 slowDelay 限速時間,也就是 10 秒后加入到隊列,后面的元素都將以 10 秒的限速時間加入到隊列,直到限速周期結束。

來看一下源碼

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock()
 defer r.failuresLock.Unlock()
 // 添加一次就計數一次
 r.failures[item] = r.failures[item] + 1
 // 計數小于 maxFastAttempts 都以 fastDelay 為限速時間,否則以 slowDelay 為限速時間
 if r.failures[item]  = r.maxFastAttempts {
 return r.fastDelay
 }
 return r.slowDelay
}

混合模式

最后一種是混合模式,可以組合使用不同的限速算法實例化限速隊列

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter { return  MaxOfRateLimiter{limiters: limiters}
}

在 k8s-client-go 的源碼中可以看到,大量的接口組合運用,將各種功能拆分成各個細小的庫,是一種非常值得學習的代碼風格以及思路。

看完上述內容,你們對如何解析 client-go 中 workqueue 有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注丸趣 TV 行業資訊頻道,感謝大家的支持。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計7833字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 和林格尔县| 潮安县| 临城县| 称多县| 高州市| 福鼎市| 象山县| 玉树县| 朝阳市| 当涂县| 万源市| 栾川县| 海淀区| 山西省| 那坡县| 沂南县| 岗巴县| 上杭县| 万盛区| 南阳市| 阿拉善右旗| 东城区| 巴里| 扶绥县| 灌南县| 渭南市| 田阳县| 龙陵县| 和硕县| 安吉县| 九龙坡区| 石门县| 道孚县| 玉门市| 潞西市| 临湘市| 诸暨市| 罗田县| 汤阴县| 蕉岭县| 平利县|