久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

apiserver的list

152次閱讀
沒有評論

共計(jì) 10083 個字符,預(yù)計(jì)需要花費(fèi) 26 分鐘才能閱讀完成。

本篇內(nèi)容主要講解“apiserver 的 list-watch 怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓丸趣 TV 小編來帶大家學(xué)習(xí)“apiserver 的 list-watch 怎么使用”吧!

0. list-watch 的需求

上圖是一個典型的 Pod 創(chuàng)建過程,在這個過程中,每次當(dāng) kubectl 創(chuàng)建了 ReplicaSet 對象后,controller-manager 都是通過 list-watch 這種方式得到了最新的 ReplicaSet 對象,并執(zhí)行自己的邏輯來創(chuàng)建 Pod 對象。其他的幾個組件,Scheduler/Kubelet 也是一樣,通過 list-watch 得知變化并進(jìn)行處理。這是組件的處理端代碼:

c.NodeLister.Store, c.nodePopulator = framework.NewInformer( c.createNodeLW(), ...(1)
 api.Node{}, ...(2)
 0, ...(3)
 framework.ResourceEventHandlerFuncs{ ...(4)
 AddFunc: c.addNodeToCache, ...(5)
 UpdateFunc: c.updateNodeInCache,
 DeleteFunc: c.deleteNodeFromCache,
)

其中 (1) 是 list-watch 函數(shù),(4)(5)則是相應(yīng)事件觸發(fā)操作的入口。

list-watch 操作需要做這么幾件事:

由組件向 apiserver 而不是 etcd 發(fā)起 watch 請求,在組件啟動時(shí)就進(jìn)行訂閱,告訴 apiserver 需要知道什么數(shù)據(jù)發(fā)生變化。Watch 是一個典型的發(fā)布 - 訂閱模式。

組件向 apiserver 發(fā)起的 watch 請求是可以帶條件的,例如,scheduler 想要 watch 的是所有未被調(diào)度的 Pod,也就是滿足 Pod.destNode= 的 Pod 來進(jìn)行調(diào)度操作;而 kubelet 只關(guān)心自己節(jié)點(diǎn)上的 Pod 列表。apiserver 向 etcd 發(fā)起的 watch 是沒有條件的,只能知道某個數(shù)據(jù)發(fā)生了變化或創(chuàng)建、刪除,但不能過濾具體的值。也就是說對象數(shù)據(jù)的條件過濾必須在 apiserver 端而不是 etcd 端完成。

list 是 watch 失敗,數(shù)據(jù)太過陳舊后的彌補(bǔ)手段,這方面詳見 基于 list-watch 的 Kubernetes 異步事件處理框架詳解 - 客戶端部分。list 本身是一個簡單的列表操作,和其它 apiserver 的增刪改操作一樣,不再多描述細(xì)節(jié)。

1. watch 的 API 處理

既然 watch 本身是一個 apiserver 提供的 http restful 的 API,那么就按照 API 的方式去閱讀它的代碼,按照 apiserver 的基礎(chǔ)功能實(shí)現(xiàn)一文所描述,我們來看它的代碼,

關(guān)鍵的處理 API 注冊代碼 pkg/apiserver/api_installer.go

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage,...
 lister, isLister := storage.(rest.Lister)
 watcher, isWatcher := storage.(rest.Watcher) ...(1)
... 
 case  LIST : // List all resources of a kind. ...(2)
 doc :=  list objects of kind   + kind
 if hasSubresource {
 doc =  list   + subresource +   of objects of kind   + kind
 handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) ...(3)

一個 rest.Storage 對象會被轉(zhuǎn)換為 watcher 和 lister 對象

提供 list 和 watch 服務(wù)的入口是同一個,在 API 接口中是通過 GET /pods?watch=true 這種方式來區(qū)分是 list 還是 watch

API 處理函數(shù)是由 lister 和 watcher 經(jīng)過 ListResource()合體后完成的。

那么就看看 ListResource()的具體實(shí)現(xiàn)吧,/pkg/apiserver/resthandler.go

func ListResource(r rest.Lister, rw rest.Watcher,... {if (opts.Watch || forceWatch)   rw != nil {watcher, err := rw.Watch(ctx,  opts) ...(1)
 ....
 serveWatch(watcher, scope, req, res, timeout)
 return
 result, err := r.List(ctx,  opts) ...(2) 
 write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)

每次有一個 watch 的 url 請求過來,都會調(diào)用 rw.Watch()創(chuàng)建一個 watcher,好吧這里的名字和上面那一層的名字重復(fù)了,但我們可以區(qū)分開,然后使用 serveWatch()來處理這個請求。watcher 的生命周期是每個 http 請求的,這一點(diǎn)非常重要。

list 在這里是另外一個分支,和 watch 分別處理,可以忽略。

響應(yīng) http 請求的過程 serveWatch()的代碼在 /pkg/apiserver/watch.go 里面

func serveWatch(watcher watch.Interface... {server.ServeHTTP(res.ResponseWriter, req.Request)
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 for {
 select {case event, ok :=  -s.watching.ResultChan():
 obj := event.Object
 if err := s.embeddedEncoder.EncodeToStream(obj, buf); 
}

這段的操作基本毫無技術(shù)含量,就是從 watcher 的結(jié)果 channel 中讀取一個 event 對象,然后持續(xù)不斷的編碼寫入到 http response 的流當(dāng)中。

這是整個過程的圖形化描述:

所以,我們的問題就回到了

watcher 這個對象,嚴(yán)格來說是 watch.Interface 的對象,位置在 pkg/watch/watch.go 中,是怎么被創(chuàng)建出來的?

這個 watcher 對象是怎么從 etcd 中獲得變化的數(shù)據(jù)的?又是怎么過濾條件的?

2. 在代碼迷宮中追尋 watcher

回到上面的代碼追蹤過程來看,watcher(watch.Interface)對象是被 Rest.Storage 對象創(chuàng)建出來的。從上一篇 apiserver 的基礎(chǔ)功能實(shí)現(xiàn) 可以知道,所有的 Rest.Storage 分兩層,一層是每個對象自己的邏輯,另一層則是通過通用的操作來搞定,像 watch 這樣的操作應(yīng)該是通用的,所以我們看這個源代碼

/pkg/registry/generic/registry/store.go

func (e *Store) Watch(ctx api.Context, options *api.ListOptions) (watch.Interface, error) {return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
func (e *Store) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {return e.Storage.Watch(ctx, key, resourceVersion, filterFunc) ...(1)
 return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc)
}

果然,我們在 (1) 這里找到了生成 Watch 的函數(shù),但這個工作是由 e.Storage 來完成的,所以我們需要找一個具體的 Storage 的生成過程,以 Pod 為例子

/pkg/registry/pod/etcd/etcd.go

func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
 prefix :=  /pods 
 storageInterface := opts.Decorator(opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Pods),  api.Pod{}, prefix, pod.Strategy, newListFunc) ...(1)
 store :=  registry.Store{
 ...
 Storage: storageInterface, ...(2)
 return PodStorage{Pod:  REST{store, proxyTransport}, ...(3)

這 (1) 就是 Storage 的生成現(xiàn)場,傳入的參數(shù)包括了一個緩存 Pod 的數(shù)量。(2)(3)是和上面代碼的連接點(diǎn)。那么現(xiàn)在問題就轉(zhuǎn)化為追尋 Decorator 這個東西具體是怎么生成的,需要重復(fù)剛才的過程,往上搜索 opts 是怎么搞進(jìn)來的。

/pkg/master/master.go – GetRESTOptionsOrDie()

/pkg/genericapiserver/genericapiserver.go – StorageDecorator()

/pkg/registry/generic/registry/storage_factory.go – StorageWithCacher()

/pkg/storage/cacher.go

OK,這樣我們就來到正題,一個具體的 watch 緩存的實(shí)現(xiàn)了!

把上面這個過程用一幅圖表示:

3. watch 緩存的具體實(shí)現(xiàn)

看代碼,首要看的是數(shù)據(jù)結(jié)構(gòu),以及考慮這個數(shù)據(jù)結(jié)構(gòu)和需要解決的問題之間的關(guān)系。

3.1 Cacher(pkg/storage/cacher.go)

對于 cacher 這結(jié)構(gòu)來說,我們從外看需求,可以知道這是一個 Storage,用于提供某個類型的數(shù)據(jù),例如 Pod 的增刪改查請求,同時(shí)它又用于 watch,用于在 client 端需要對某個 key 的變化感興趣時(shí),創(chuàng)建一個 watcher 來源源不斷的提供新的數(shù)據(jù)給客戶端。

那么 cacher 是怎么滿足這些需求的呢?答案就在它的結(jié)構(gòu)里面:

type Cacher struct {
 // Underlying storage.Interface.
 storage Interface
 //  sliding window  of recent changes of objects and the current state.
 watchCache *watchCache
 reflector *cache.Reflector
 // Registered watchers.
 watcherIdx int
 watchers map[int]*cacheWatcher
}

略去里面的鎖(在看代碼的時(shí)候一開始要忽略鎖的存在,鎖是后期為了避免破壞數(shù)據(jù)再加上去的,不影響數(shù)據(jù)流),略去里面的一些非關(guān)鍵的成員,現(xiàn)在我們剩下這 3 段重要的成員,其中

storage 是連接 etcd 的,也就是背后的裸存儲

watchCache 并不僅僅是和注釋里面說的那樣,是個滑動窗口,里面存儲了所有數(shù)據(jù) + 滑動窗口

watchers 這是為每個請求創(chuàng)建的 struct,每個 watch 的 client 上來后都會被創(chuàng)建一個,所以這里有個 map

當(dāng)然,這 3 個成員的作用是我看了所有代碼后,總結(jié)出來的,一開始讀代碼時(shí)不妨先在腦子里面有個定位,然后在看下面的方法時(shí)不斷修正這個定位。那么,接下來就看看具體的方法是怎么讓數(shù)據(jù)在這些結(jié)構(gòu)里面流動的吧!

初始化方法

func NewCacherFromConfig(config CacherConfig) *Cacher { 
 cacher.startCaching(stopCh)
func (c *Cacher) startCaching(stopChannel  -chan struct{}) {if err := c.reflector.ListAndWatch(stopChannel); err != nil {glog.Errorf( unexpected ListAndWatch error: %v , err)
}

其他的部分都是陳詞濫調(diào),只有 startCaching()這段有點(diǎn)意思,這里啟動一個 go 協(xié)程,最后啟動了 c.reflector.ListAndWatch()這個方法,如果對 k8s 的基本有了解的話,這個其實(shí)就是一個把遠(yuǎn)端數(shù)據(jù)源源不斷的同步到本地的方法,那么數(shù)據(jù)落在什么地方呢?往上看可以看到

reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),

也就是說從創(chuàng)建 cacher 的實(shí)例開始,就會從 etcd 中把所有 Pod 的數(shù)據(jù)同步到 watchCache 里面來。這也就印證了 watchCache 是數(shù)據(jù)從 etcd 過來的第一站。

增刪改方法

func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {return c.storage.Create(ctx, key, obj, out, ttl)
}

大部分方法都很無聊,就是短路到底層的 storage 直接執(zhí)行。

Watch 方法

// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
 watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
 c.watchers[c.watcherIdx] = watcher
 c.watcherIdx++
 return watcher, nil
}

這里的邏輯就比較清晰,首先從 watchCache 中拿到從某個 resourceVersion 以來的所有數(shù)據(jù)——initEvents,然后用這個數(shù)據(jù)創(chuàng)建了一個 watcher 返回出去為某個客戶端提供服務(wù)。

List 方法

// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error {filterFunc := filterFunction(key, c.keyFunc, filter)
 objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
 if err != nil {return fmt.Errorf( failed to wait for fresh list: %v , err)
 for _, obj := range objs {if filterFunc(object) {listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
}

從這段代碼中我們可以看出 2 件事,一是 list 的數(shù)據(jù)都是從 watchCache 中獲取的,二是獲取后通過 filterFunc 過濾了一遍然后返回出去。

3.2 WatchCache(pkg/storage/watch_cache.go)

這個結(jié)構(gòu)應(yīng)該是緩存的核心結(jié)構(gòu),從上一層的代碼分析中我們已經(jīng)知道了對這個結(jié)構(gòu)的需求,包括存儲所有這個類型的數(shù)據(jù),包括當(dāng)有新的數(shù)據(jù)過來時(shí)把數(shù)據(jù)扔到 cacheWatcher 里面去,總之,提供 List 和 Watch 兩大輸出。

type watchCache struct {
 // cache is used a cyclic buffer - its first element (with the smallest
 // resourceVersion) is defined by startIndex, its last element is defined
 // by endIndex (if cache is full it will be startIndex + capacity).
 // Both startIndex and endIndex can be greater than buffer capacity -
 // you should always apply modulo capacity to get an index in cache array.
 cache []watchCacheElement
 startIndex int
 endIndex int
 // store will effectively support LIST operation from the  end of cache
 // history  i.e. from the moment just after the newest cached watched event.
 // It is necessary to effectively allow clients to start watching at now.
 store cache.Store
}

這里的關(guān)鍵數(shù)據(jù)結(jié)構(gòu)依然是 2 個

cache 環(huán)形隊(duì)列,存儲有限個數(shù)的最新數(shù)據(jù)

store 底層實(shí)際上是個線程安全的 hashMap,存儲全量數(shù)據(jù)

那么繼續(xù)看看方法是怎么運(yùn)轉(zhuǎn)的吧~

增刪改方法

func (w *watchCache) Update(obj interface{}) error {event := watch.Event{Type: watch.Modified, Object: object}
 f := func(obj runtime.Object) error { return w.store.Update(obj) }
 return w.processEvent(event, resourceVersion, f)

func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {previous, exists, err := w.store.Get(event.Object) watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion} w.onEvent(watchCacheEvent) w.updateCache(resourceVersion, watchCacheEvent) // Assumes that lock is already held for write. func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) {w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event} w.endIndex++ }

所有的增刪改方法做的事情都差不多,就是在 store 里面存具體的數(shù)據(jù),然后調(diào)用 processEvent()去增加環(huán)形隊(duì)列里面的數(shù)據(jù),如果詳細(xì)看一下 onEvent 的操作,就會發(fā)現(xiàn)這個操作的本質(zhì)是落在 cacher.go 里面:

func (c *Cacher) processEvent(event watchCacheEvent) {
 for _, watcher := range c.watchers {watcher.add(event)
}

往所有的 watcher 里面挨個添加數(shù)據(jù)。總體來說,我們可以從上面的代碼中得出一個結(jié)論:cache 里面存儲的是 Event,也就是有 prevObject 的,對于所有操作都會在 cache 里面保存,但對于 store 來說,只存儲當(dāng)下的數(shù)據(jù),刪了就刪了,改了就改了。

WaitUntilFreshAndList()

這里本來應(yīng)該討論 List()方法的,但在 cacher 里面的 List()實(shí)際上使用的是這個,所以我們看這個方法。

func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {startTime := w.clock.Now()
 go func() {w.cond.Broadcast()
 for w.resourceVersion   resourceVersion {w.cond.Wait()
 return w.store.List(), w.resourceVersion, nil}

這個方法比較繞,前面使用了一堆 cond 通知來和其他協(xié)程通信,最后還是調(diào)用了 store.List()把數(shù)據(jù)返回出去。后面來具體分析這里的協(xié)調(diào)機(jī)制。

GetAllEventsSinceThreadUnsafe()

這個方法在 cacher 的創(chuàng)建 cacheWatcher 里面使用,把當(dāng)前 store 里面的所有數(shù)據(jù)都搞出來,然后把 store 里面的數(shù)據(jù)都轉(zhuǎn)換為 AddEvent,配上 cache 里面的 Event,全部返回出去。

3.3 CacheWatcher(pkg/storage/cacher.go)

這個結(jié)構(gòu)是每個 watch 的 client 都會擁有一個的,從上面的分析中我們也能得出這個結(jié)構(gòu)的需求,就是從 watchCache 里面搞一些數(shù)據(jù),然后寫到客戶端那邊。

// cacherWatch implements watch.Interface
type cacheWatcher struct {
 sync.Mutex
 input chan watchCacheEvent
 result chan watch.Event
 filter FilterFunc
 stopped bool
 forget func(bool)
}

這段代碼比較簡單,就不去分析方法了,簡單說就是數(shù)據(jù)在增加的時(shí)候放到 input 這個 channel 里面去,通過 filter 然后輸出到 result 這個 channel 里面去。

到此,相信大家對“apiserver 的 list-watch 怎么使用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是丸趣 TV 網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-16發(fā)表,共計(jì)10083字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 寿阳县| 祥云县| 宝兴县| 长武县| 永福县| 和静县| 定西市| 澄城县| 买车| 鄂伦春自治旗| 论坛| 华坪县| 台南县| 巴青县| 重庆市| 卢湾区| 军事| 澄迈县| 曲阳县| 清镇市| 抚远县| 内江市| 苏尼特左旗| 雷波县| 大港区| 巴马| 馆陶县| 长春市| 栾川县| 九龙坡区| 西乡县| 禄劝| 沁源县| 阳原县| 文安县| 安顺市| 渑池县| 阿拉善右旗| 河源市| 金门县| 宁城县|