共計 7833 個字符,預計需要花費 20 分鐘才能閱讀完成。
今天就跟大家聊聊有關如何解析 client-go 中 workqueue,可能很多人都不太了解,為了讓大家更加了解,丸趣 TV 小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
下面主要講述下 client-go 中 workqueue, 看一下 client-go 的一個整體數據走向. 如下圖:
而 workqueue 主要是在 listener 這里引用,listener 使用 chan 獲取到數據之后將數據放入到工作隊列進行處理。主要是由于 chan 過于簡單,已經無法滿足 K8S 的場景,所以衍生出了 workqueue,
特性
有序
去重
并發
延遲處理
限速
當前有三種 workqueue
基本隊列
延遲隊列
限速隊列
其中延遲隊列是基于基本隊列實現的,而限流隊列基于延遲隊列實現
基本隊列
看一下基本隊列的接口
// client-go 源碼路徑 util/workqueue/queue.go
type Interface interface {
// 新增元素 可以是任意對象
Add(item interface{})
// 獲取當前隊列的長度
Len() int
// 阻塞獲取頭部元素 (先入先出) 返回元素以及隊列是否關閉
Get() (item interface{}, shutdown bool)
// 顯示標記完成元素的處理
Done(item interface{})
// 關閉隊列
ShutDown()
// 隊列是否處于關閉狀態
ShuttingDown() bool}
看一下基本隊列的數據結構, 只看三個重點處理的, 其他的沒有展示出來
type Type struct {
// 含有所有元素的元素的隊列 保證有序
queue []t
// 所有需要處理的元素 set 是基于 map 以 value 為空 struct 實現的結構,保證去重
dirty set
// 當前正在處理中的元素
processing set
...
type empty struct{}
type t interface{}
type set map[t]empty
基本隊列的 hello world 也很簡單
wq := workqueue.New()
wq.Add(hello)
v, _ := wq.Get()
基本隊列 Add
func (q *Type) Add(item interface{}) { q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果當前處于關閉狀態, 則不再新增元素
if q.shuttingDown {
return
}
// 如果元素已經在等待處理中, 則不再新增
if q.dirty.has(item) {
return
}
// 添加到 metrics
q.metrics.add(item)
// 加入等待處理中
q.dirty.insert(item)
// 如果目前正在處理該元素 就不將元素添加到隊列
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()}
基本隊列 Get
func (q *Type) Get() (item interface{}, shutdown bool) { q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果當前沒有元素并且不處于關閉狀態, 則阻塞
for len(q.queue) == 0 !q.shuttingDown { q.cond.Wait()
}
...
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
// 把元素添加到正在處理隊列中
q.processing.insert(item)
// 把隊列從等待處理隊列中刪除
q.dirty.delete(item)
return item, false
}
基本隊列實例化
func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
t := Type{
clock: c,
dirty: set{},
processing: set{},
cond: sync.NewCond(sync.Mutex{}),
metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod,
}
// 啟動一個協程 定時更新 metrics
go t.updateUnfinishedWorkLoop()
return t
func (q *Type) updateUnfinishedWorkLoop() { t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
defer t.Stop()
for range t.C() { if !func() bool { q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.shuttingDown { q.metrics.updateUnfinishedWork()
return true
}
return false
}() {
return
}
}
}
延遲隊列
延遲隊列的實現思路主要是使用優先隊列存放需要延遲添加的元素, 每次判斷最小延遲的元素書否已經達到了加入隊列的要求 (延遲的時間到了), 如果是則判斷下一個元素, 直到沒有元素或者元素還需要延遲為止。
看一下延遲隊列的數據結構
type delayingType struct {
Interface
...
// 放置延遲添加的元素
waitingForAddCh chan *waitFor
...
}
主要是使用 chan 來保存延遲添加的元素, 而具體實現是通過一個實現了一個 AddAfter 方法,看一下具體的內容
// 延遲隊列的接口
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
...
// 如果延遲實現小于等于 0 直接添加到隊列
if duration = 0 { q.Add(item)
return
}
select {
case -q.stopCh:
// 添加到 chan, 下面會講一下這個 chan 的處理
case q.waitingForAddCh - waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
延遲元素的處理
func (q *delayingType) waitingLoop() { defer utilruntime.HandleCrash()
never := make(-chan time.Time)
var nextReadyAtTimer clock.Timer
waitingForQueue := waitForPriorityQueue{}
// 這里是初始化一個優先隊列 具體實現有興趣的同學可以研究下
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for { if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
// Add ready entries
for waitingForQueue.Len() 0 { entry := waitingForQueue.Peek().(*waitFor)
// 看一下第一個元素是否已經到達延遲的時間了
if entry.readyAt.After(now) {
break
}
// 時間到了, 將元素添加到工作的隊列, 并且從延遲的元素中移除
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// Set up a wait for the first item s readyAt (if one exists)
nextReadyAt := never
if waitingForQueue.Len() 0 {
if nextReadyAtTimer != nil { nextReadyAtTimer.Stop()
}
// 如果還有需要延遲的元素, 計算第一個元素的延遲時間 (最小延遲的元素)
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
select {
case -q.stopCh:
return
case -q.heartbeat.C():
// 定時檢查下是否有元素達到延遲的時間
case -nextReadyAt:
// 這里是上面計算出來的時間, 時間到了, 處理到達延遲時間的元素
case waitEntry := -q.waitingForAddCh:
// 檢查是否需要延遲, 如果需要延遲就加入到延遲等待
if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
// 如果不需要延遲就直接添加到隊列
q.Add(waitEntry.data)
}
drained := false
for !drained {
select { case waitEntry := -q.waitingForAddCh:
上面 waitingLoop 是在實例化延遲隊列的時候調用的,看一下實例化時候的邏輯
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
// 實例化一個數據結構
ret := delayingType{ Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
// 放到一個協程中處理延遲元素
go ret.waitingLoop()
return ret
}
限速隊列
當前限速隊列支持 4 中限速模式
令牌桶算法限速
排隊指數限速
計數器模式
混合模式 (多種限速算法同時使用)
限速隊列的底層實際上還是通過延遲隊列來進行限速, 通過計算出元素的限速時間作為延遲時間
來看一下限速接口
type RateLimiter interface {
//
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn t matter whether its for perm failing
// or for success, we ll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}
看一下限速隊列的數據結構
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
// 實際上底層還是調用的延遲隊列, 通過計算出元素的延遲時間 進行限速
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn t matter whether it s for perm failing
// or for success, we ll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
func (q *rateLimitingType) AddRateLimited(item interface{}) {
// 通過 when 方法計算延遲加入隊列的時間
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
令牌桶算法
client-go 中的令牌桶限速是通過 golang.org/x/time/rat 包來實現的
可以通過 flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int) 來使用令牌桶限速算法,其中第一個參數 qps 表示每秒補充多少 token,burst 表示總 token 上限為多少。
排隊指數算法
排隊指數可以通過 workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) 來使用。
這個算法有兩個參數:
baseDelay 基礎限速時間
maxDelay 最大限速時間
舉個例子來理解一下這個算法,例如快速插入 5 個相同元素,baseDelay 設置為 1 秒,maxDelay 設置為 10 秒,都在同一個限速期內。第一個元素會在 1 秒后加入到隊列,第二個元素會在 2 秒后加入到隊列,第三個元素會在 4 秒后加入到隊列,第四個元素會在 8 秒后加入到隊列,第五個元素會在 10 秒后加入到隊列 (指數計算的結果為 16,但是最大值設置了 10 秒)。
來看一下源碼的計算
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 第一次為 0
exp := r.failures[item]
// 累加 1
r.failures[item] = r.failures[item] + 1
// 通過當前計數和 baseDelay 計算指數結果 baseDelay*(2 的 exp 次方)
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated r.maxDelay {
return r.maxDelay
}
return calculated
}
計數器模式
計數器模式可以通過 workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) 來使用,有三個參數
fastDelay 快限速時間
slowDelay 慢限速時間
maxFastAttempts 快限速元素個數
原理是這樣的,假設 fastDelay 設置為 1 秒,slowDelay 設置為 10 秒,maxFastAttempts 設置為 3,同樣在一個限速周期內快速插入 5 個相同的元素。前三個元素都是以 1 秒的限速時間加入到隊列,添加第四個元素時開始使用 slowDelay 限速時間,也就是 10 秒后加入到隊列,后面的元素都將以 10 秒的限速時間加入到隊列,直到限速周期結束。
來看一下源碼
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 添加一次就計數一次
r.failures[item] = r.failures[item] + 1
// 計數小于 maxFastAttempts 都以 fastDelay 為限速時間,否則以 slowDelay 為限速時間
if r.failures[item] = r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
混合模式
最后一種是混合模式,可以組合使用不同的限速算法實例化限速隊列
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter { return MaxOfRateLimiter{limiters: limiters}
}
在 k8s-client-go 的源碼中可以看到,大量的接口組合運用,將各種功能拆分成各個細小的庫,是一種非常值得學習的代碼風格以及思路。
看完上述內容,你們對如何解析 client-go 中 workqueue 有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注丸趣 TV 行業資訊頻道,感謝大家的支持。