久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何解析k8s中的Informer機(jī)制

共計(jì) 6163 個(gè)字符,預(yù)計(jì)需要花費(fèi) 16 分鐘才能閱讀完成。

如何解析 k8s 中的 Informer 機(jī)制,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。

Informer 機(jī)制架構(gòu)設(shè)計(jì)總覽

下面是我根據(jù)理解畫的一個(gè)數(shù)據(jù)流轉(zhuǎn)圖, 從全局視角看一下數(shù)據(jù)的整體走向是怎么樣的。

其中虛線的表示的是代碼中的方法。

首先講一個(gè)結(jié)論:

通過 Informer 機(jī)制獲取數(shù)據(jù)的情況下,在初始化的時(shí)候會(huì)從 Kubernetes API Server 獲取對(duì)應(yīng) Resource 的全部 Object,后續(xù)只會(huì)通過 Watch 機(jī)制接收 API Server 推送過來(lái)的數(shù)據(jù),不會(huì)再主動(dòng)從 API Server 拉取數(shù)據(jù),直接使用本地緩存中的數(shù)據(jù)以減少 API Server 的壓力。

Watch 機(jī)制基于 HTTP 的 Chunk 實(shí)現(xiàn),維護(hù)一個(gè)長(zhǎng)連接,這是一個(gè)優(yōu)化點(diǎn),減少請(qǐng)求的數(shù)據(jù)量。第二個(gè)優(yōu)化點(diǎn)是 SharedInformer, 它可以讓同一種資源使用的是同一個(gè) Informer,例如 v1 版本的 Deployment 和 v1beta1 版本的 Deployment 同時(shí)存在的時(shí)候,共享一個(gè) Informer。

上面圖中可以看到 Informer 分為三個(gè)部分,可以理解為三大邏輯。

其中 Reflector 主要是把從 API Server 數(shù)據(jù)獲取到的數(shù)據(jù)放到 DeltaFIFO 隊(duì)列中,充當(dāng)生產(chǎn)者角色。

SharedInformer 主要是從 DeltaFIFIO 隊(duì)列中獲取數(shù)據(jù)并分發(fā)數(shù)據(jù),充當(dāng)消費(fèi)者角色。

最后 Indexer 是作為本地緩存的存儲(chǔ)組件存在。

Reflector 理解

Reflector 中主要看 Run、ListAndWatch、watchHandler 三個(gè)地方就足夠了。

源碼位置是 tools/cache/reflector.go

// Ruvn starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
// 開始時(shí)執(zhí)行 Run,上一層調(diào)用的地方是  controller.go 中的 Run 方法
func (r *Reflector) Run(stopCh  -chan struct{}) {
 
 klog.V(3).Infof(Starting reflector %v (%s) from %s , r.expectedTypeName, r.resyncPeriod, r.name)
 wait.Until(func() {
 // 啟動(dòng)后執(zhí)行一次 ListAndWatch
 if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err)
 }
 }, r.period, stopCh)
// and then use the resource version to watch.
// It returns error if ListAndWatch didn t even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh  -chan struct{}) error {
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
 // list request will return the full response.
 pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
// 這里是調(diào)用了各個(gè)資源中的 ListFunc 函數(shù), 例如如果 v1 版本的 Deployment
// 則調(diào)用的是 informers/apps/v1/deployment.go 中的 ListFunc
 return r.listerWatcher.List(opts)
 }))
 if r.WatchListPageSize != 0 {
 pager.Pa1geSize = r.WatchListPageSize
 }
 // Pager falls back to full list if paginated list calls fail due to an  Expired

數(shù)據(jù)的生產(chǎn)就結(jié)束了,就兩點(diǎn):

初始化時(shí)從 API Server 請(qǐng)求數(shù)據(jù)

監(jiān)聽后續(xù)從 Watch 推送來(lái)的數(shù)據(jù)

DeltaFIFO 理解

先看一下數(shù)據(jù)結(jié)構(gòu):

type DeltaFIFO struct { items map[string]Deltas
 queue []string
type Delta struct {
 Type DeltaType
 Object interface{}
type Deltas []Delta

 Added DeltaType =  Added  Updated DeltaType =  Updated  Deleted DeltaType =  Deleted  Sync DeltaType =  Sync )

其中 queue 存儲(chǔ)的是 Object 的 id, 而 items 存儲(chǔ)的是以 ObjectID 為 key 的這個(gè) Object 的事件列表,

可以想象到是這樣的一個(gè)數(shù)據(jù)結(jié)構(gòu), 左邊是 Key, 右邊是一個(gè)數(shù)組對(duì)象, 其中每個(gè)元素都是由 type 和 obj 組成.

DeltaFIFO 顧名思義存放 Delta 數(shù)據(jù)的先入先出隊(duì)列,相當(dāng)于一個(gè)數(shù)據(jù)的中轉(zhuǎn)站,將數(shù)據(jù)從一個(gè)地方轉(zhuǎn)移另一個(gè)地方。

主要看的內(nèi)容是 queueActionLocked、Pop、Resync

queueActionLocked 方法:

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { newDeltas := append(f.items[id], Delta{actionType, obj})
 // 去重處理
 newDeltas = dedupDeltas(newDeltas)
 if len(newDeltas)   0 {
 ... 
 //pop 消息
 
 f.cond.Broadcast()
 ...
 return nil
}

Pop 方法:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock()
 defer f.lock.Unlock()
 for { for len(f.queue) == 0 { // 阻塞   直到調(diào)用了 f.cond.Broadcast()
 f.cond.Wait()
 }
// 取出第一個(gè)元素
 id := f.queue[0]
 f.queue = f.queue[1:]
 ...
 item, ok := f.items[id]
 delete(f.items, id)
 // 這個(gè) process 可以在 controller.go 中的 processLoop() 找到
 // 初始化是在 shared_informer.go 的 Run
 // 最終執(zhí)行到 shared_informer.go 的 HandleDeltas 方法
 err := process(item)
 // 如果處理出錯(cuò)了重新放回隊(duì)列中
 if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item)
 err = e.Err
 }
 ...
 }
}

Resync 機(jī)制:

小總結(jié):每次從本地緩存 Indexer 中獲取數(shù)據(jù)重新放到 DeltaFIFO 中執(zhí)行任務(wù)邏輯。

啟動(dòng)的 Resync 地方是 reflector.go 的 resyncChan() 方法,在 reflector.go 的 ListAndWatch 方法中的調(diào)用開始定時(shí)執(zhí)行。

go func() {
 // 啟動(dòng)定時(shí)任務(wù)
 resyncCh, cleanup := r.resyncChan()
 defer func() { cleanup() // Call the last one written into cleanup
 }()
 for {
 select {
 case  -resyncCh:
 case  -stopCh:
 return
 case  -cancelCh:
 return
 }
 // 定時(shí)執(zhí)行   調(diào)用會(huì)執(zhí)行到 delta_fifo.go 的 Resync() 方法
 if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof(%s: forcing resync , r.name)
 if err := r.store.Resync(); err != nil {
 resyncerrc  - err
 return
 }
 }
 cleanup()
 resyncCh, cleanup = r.resyncChan()
 }
 }()
func (f *DeltaFIFO) Resync() error {
 ...
// 從緩存中獲取到所有的 key
 keys := f.knownObjects.ListKeys()
 for _, k := range keys { if err := f.syncKeyLocked(k); err != nil {
 return err
 }
 }
 return nil

func (f *DeltaFIFO) syncKeyLocked(key string) error {  // 獲緩存拿到對(duì)應(yīng)的 Object  obj, exists, err := f.knownObjects.GetByKey(key)  ...  // 放入到隊(duì)列中執(zhí)行任務(wù)邏輯  if err := f.queueActionLocked(Sync, obj); err != nil { return fmt.Errorf( couldn t queue object: %v , err)  }  return nil }

SharedInformer 消費(fèi)消息理解

主要看 HandleDeltas 方法就好,消費(fèi)消息然后分發(fā)數(shù)據(jù)并且存儲(chǔ)數(shù)據(jù)到緩存的地方

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock()
 defer s.blockDeltas.Unlock()
 // from oldest to newest
 for _, d := range obj.(Deltas) {
 
 switch d.Type {
 case Sync, Added, Updated:
 ...
 // 查一下是否在 Indexer 緩存中   如果在緩存中就更新緩存中的對(duì)象
 if old, exists, err := s.indexer.Get(d.Object); err == nil   exists { if err := s.indexer.Update(d.Object); err != nil {
 return err
 }
 // 把數(shù)據(jù)分發(fā)到 Listener
 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
 } else {
 // 沒有在 Indexer 緩存中   把對(duì)象插入到緩存中
 if err := s.indexer.Add(d.Object); err != nil {
 return err
 }
 s.processor.distribute(addNotification{newObj: d.Object}, isSync)
 }
 ...
 }
 }
 return nil
}

Indexer 理解

這塊不會(huì)講述太多內(nèi)容,因?yàn)槲艺J(rèn)為 Informer 機(jī)制最主要的還是前面數(shù)據(jù)的流轉(zhuǎn),當(dāng)然這并不代表數(shù)據(jù)存儲(chǔ)不重要,而是先理清楚整體的思路,后續(xù)再詳細(xì)更新存儲(chǔ)的部分。

Indexer 使用的是 threadsafe_store.go 中的 threadSafeMap 存儲(chǔ)數(shù)據(jù),是一個(gè)線程安全并且?guī)в兴饕δ艿?map, 數(shù)據(jù)只會(huì)存放在內(nèi)存中,每次涉及操作都會(huì)進(jìn)行加鎖。

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
 lock sync.RWMutex
 items map[string]interface{}
 indexers Indexers
 indices Indices
}

Indexer 還有一個(gè)索引相關(guān)的內(nèi)容就暫時(shí)不展開講述。

Example 代碼

-------------
package main
import (
  flag 
  fmt 
  path/filepath 
  time 
 v1  k8s.io/api/apps/v1 
  k8s.io/apimachinery/pkg/labels 
  k8s.io/client-go/informers 
  k8s.io/client-go/kubernetes 
  k8s.io/client-go/rest 
  k8s.io/client-go/tools/cache 
  k8s.io/client-go/tools/clientcmd 
  k8s.io/client-go/util/homedir 
func main() {
 var err error
 var config *rest.Config
 var kubeconfig *string
 if home := homedir.HomeDir(); home != 

以上示例代碼中程序啟動(dòng)后會(huì)拉取一次 Deployment 數(shù)據(jù),并且拉取數(shù)據(jù)完成后從本地緩存中 List 一次 default 命名空間的 Deployment 資源并打印,然后每 60 秒 Resync 一次 Deployment 資源。

QA

為什么需要 Resync?

在本周有同學(xué)提出一個(gè),我看到這個(gè)問題后也感覺挺奇怪的,因?yàn)?Resync 是從本地緩存的數(shù)據(jù)緩存到本地緩存 (從開始到結(jié)束來(lái)說(shuō)是這樣), 為什么需要把數(shù)據(jù)拿出來(lái)又走一遍流程呢?當(dāng)時(shí)鉆牛角尖也是想不明白,后來(lái)?yè)Q個(gè)角度想就知道了。

數(shù)據(jù)從 API Server 過來(lái)并且經(jīng)過處理后放到緩存中,但數(shù)據(jù)并不一定就可以正常處理,也就是說(shuō)可能報(bào)錯(cuò)了,而這個(gè) Resync 相當(dāng)于一個(gè)重試的機(jī)制。

可以嘗試實(shí)踐一下: 部署有狀態(tài)服務(wù),存儲(chǔ)使用 LocalPV(也可以換成自己熟悉的), 這時(shí)候 pod 會(huì)由于存儲(chǔ)目錄不存在而啟動(dòng)失敗. 然后在 pod 啟動(dòng)失敗后再創(chuàng)建好對(duì)應(yīng)的目錄,過一會(huì) pod 就啟動(dòng)成功了。

看完上述內(nèi)容,你們掌握如何解析 k8s 中的 Informer 機(jī)制的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注丸趣 TV 行業(yè)資訊頻道,感謝各位的閱讀!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-16發(fā)表,共計(jì)6163字。
轉(zhuǎn)載說(shuō)明:除特殊說(shuō)明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請(qǐng)注明出處。
評(píng)論(沒有評(píng)論)
主站蜘蛛池模板: 尚志市| 沭阳县| 吕梁市| 昭觉县| 喀喇沁旗| 青海省| 观塘区| 星子县| 大邑县| 忻城县| 襄垣县| 板桥市| 闽侯县| 虹口区| 济南市| 涞源县| 平顺县| 大理市| 长顺县| 自治县| 连城县| 抚松县| 高阳县| 郯城县| 木里| 罗定市| 乌拉特后旗| 三江| 随州市| 龙岩市| 麻城市| 错那县| 威远县| 武山县| 大丰市| 崇义县| 桦甸市| 酉阳| 云阳县| 兴城市| 冀州市|