共計 7082 個字符,預計需要花費 18 分鐘才能閱讀完成。
本篇內容介紹了“Istio Pilot 代碼是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
Istio Pilot 組件介紹
在 Istio 架構中,Pilot 組件屬于最核心的組件,負責了服務網格中的流量管理以及控制面和數據面之間的配置下發。Pilot 內部的代碼結構比較復雜,本文中我們將通過對 Pilot 的代碼的深入分析來了解 Pilot 實現原理。
首先我們來看一下 Pilot 在 Istio 中的功能定位,Pilot 將服務信息和配置數據轉換為 xDS 接口的標準數據結構,通過 gRPC 下發到數據面的 Envoy。如果把 Pilot 看成一個處理數據的黑盒,則其有兩個輸入,一個輸出:
目前 Pilot 的輸入包括兩部分數據來源:
服務數據:來源于各個服務注冊表 (Service Registry),例如 Kubernetes 中注冊的 Service,Consul Catalog 中的服務等。
配置規則:各種配置規則,包括路由規則及流量管理規則等,通過 Kubernetes CRD(Custom Resources Definition) 形式定義并存儲在 Kubernetes 中。
Pilot 的輸出為符合 xDS 接口的數據面配置數據,并通過 gRPC Streaming 接口將配置數據推送到數據面的 Envoy 中。
備注:Istio 代碼庫在不停變化更新中,本文分析所基于的代碼 commit 為: d539abe00c2599d80c6d64296f78d3bb8ab4b033
Pilot-Discovery 代碼結構
Istio Pilot 的代碼分為 Pilot-Discovery 和 Pilot-Agent,其中 Pilot-Agent 用于在數據面負責 Envoy 的生命周期管理,Pilot-Discovery 才是控制面進行流量管理的組件,本文將重點分析控制面部分,即 Pilot-Discovery 的代碼。
Pilot-Discovery 的入口函數為:pilot/cmd/pilot-discovery/main.go 中的 main 方法。main 方法中創建了 Discovery Server,Discovery Server 中主要包含三部分邏輯:
Config Controller
Config Controller 用于管理各種配置數據,包括用戶創建的流量管理規則和策略。Istio 目前支持三種類型的 Config Controller:
Kubernetes:使用 Kubernetes 來作為配置數據的存儲,該方式直接依附于 Kubernetes 強大的 CRD 機制來存儲配置數據,簡單方便,是 Istio 最開始使用的配置存儲方案。
MCP (Mesh Configuration Protocol):使用 Kubernetes 來存儲配置數據導致了 Istio 和 Kubernetes 的耦合,限制了 Istio 在非 Kubernetes 環境下的運用。為了解決該耦合,Istio 社區提出了 MCP,MCP 定義了一個向 Istio 控制面下發配置數據的標準協議,Istio Pilot 作為 MCP Client,任何實現了 MCP 協議的 Server 都可以通過 MCP 協議向 Pilot 下發配置,從而解除了 Istio 和 Kubernetes 的耦合。如果想要了解更多關于 MCP 的內容,請參考文后的鏈接。
Memory:一個在內存中的 Config Controller 實現,主要用于測試。
目前 Istio 的配置包括:
Virtual Service: 定義流量路由規則。
Destination Rule: 定義和一個服務或者 subset 相關的流量處理規則,包括負載均衡策略,連接池大小,斷路器設置,subset 定義等等。
Gateway: 定義入口網關上對外暴露的服務。
Service Entry: 通過定義一個 Service Entry 可以將一個外部服務手動添加到服務網格中。
Envoy Filter: 通過 Pilot 在 Envoy 的配置中添加一個自定義的 Filter。
Service Controller
Service Controller 用于管理各種 Service Registry,提出服務發現數據,目前 Istio 支持的 Service Registry 包括:
Kubernetes:對接 Kubernetes Registry,可以將 Kubernetes 中定義的 Service 和 Instance 采集到 Istio 中。
Consul:對接 Consul Catalog,將 Consul 中定義的 Service 采集到 Istio 中。
MCP:和 MCP config controller 類似,從 MCP Server 中獲取 Service 和 Service Instance。
Memory:一個內存中的 Service Controller 實現,主要用于測試。
Discovery Service
Discovery Service 中主要包含下述邏輯:
啟動 gRPC Server 并接收來自 Envoy 端的連接請求。
接收 Envoy 端的 xDS 請求,從 Config Controller 和 Service Controller 中獲取配置和服務信息,生成響應消息發送給 Envoy。
監聽來自 Config Controller 的配置變化消息和來自 Service Controller 的服務變化消息,并將配置和服務變化內容通過 xDS 接口推送到 Envoy。(備注:目前 Pilot 未實現增量變化推送,每次變化推送的是全量配置,在網格中服務較多的情況下可能會有性能問題)。
Pilot-Discovery 業務流程
Pilot-Disocvery 包括以下主要的幾個業務流程:
初始化 Pilot-Discovery 的各個主要組件
Pilot-Discovery 命令的入口為 pilot/cmd/pilot-discovery/main.go 中的 main 方法,在該方法中創建 Pilot Server,Server 代碼位于文件 pilot/pkg/bootstrap/server.go 中。Server 主要做了下面一些初始化工作:
創建并初始化 Config Controller。
創建并初始化 Service Controller。
創建并初始化 Discovery Server,Pilot 中創建了基于 Envoy V1 API 的 HTTP Discovery Server 和基于 Envoy V2 API 的 GPRC Discovery Server。由于 V1 已經被廢棄,本文將主要分析 V2 API 的 gRPC Discovery Server。
將 Discovery Server 注冊為 Config Controller 和 Service Controller 的 Event Handler,監聽配置和服務變化消息。
創建 gRPC Server 并接收 Envoy 的連接請求
Pilot Server 創建了一個 gRPC Server,用于監聽和接收來自 Envoy 的 xDS 請求。pilot/pkg/proxy/envoy/v2/ads.go 中的 DiscoveryServer.StreamAggregatedResources 方法被注冊為 gRPC Server 的服務處理方法。
當 gRPC Server 收到來自 Envoy 的連接時,會調用 DiscoveryServer.StreamAggregatedResources 方法,在該方法中創建一個 XdsConnection 對象,并開啟一個 goroutine 從該 connection 中接收客戶端的 xDS 請求并進行處理;如果控制面的配置發生變化,Pilot 也會通過該 connection 把配置變化主動推送到 Envoy 端。
配置變化后向 Envoy 推送更新
這是 Pilot 中最復雜的一個業務流程,主要是因為代碼中采用了多個 channel 和 queue 對變化消息進行合并和轉發。該業務流程如下:
Config Controller 或者 Service Controller 在配置或服務發生變化時通過回調方法通知 Discovery Server,Discovery Server 將變化消息放入到 Push Channel 中。
Discovery Server 通過一個 goroutine 從 Push Channel 中接收變化消息,將一段時間內連續發生的變化消息進行合并。如果超過指定時間沒有新的變化消息,則將合并后的消息加入到一個隊列 Push Queue 中。
另一個 goroutine 從 Push Queue 中取出變化消息,生成 XdsEvent,發送到每個客戶端連接的 Push Channel 中。
在 DiscoveryServer.StreamAggregatedResources 方法中從 Push Channel 中取出 XdsEvent,然后根據上下文生成符合 xDS 接口規范的 DiscoveryResponse,通過 gRPC 推送給 Envoy 端。(gRPC 會為每個 client 連接單獨分配一個 goroutine 來進行處理,因此不同客戶端連接的 StreamAggregatedResources 處理方法是在不同 goroutine 中處理的)
響應 Envoy 主動發起的 xDS 請求
Pilot 和 Envoy 之間建立的是一個雙向的 Streaming gRPC 服務調用,因此 Pilot 可以在配置變化時向 Envoy 推送,Envoy 也可以主動發起 xDS 調用請求獲取配置。Envoy 主動發起 xDS 請求的流程如下:
Envoy 通過創建好的 gRPC 連接發送一個 DiscoveryRequest
Discovery Server 通過一個 goroutine 從 XdsConnection 中接收來自 Envoy 的 DiscoveryRequest,并將請求發送到 ReqChannel 中
Discovery Server 的另一個 goroutine 從 ReqChannel 中接收 DiscoveryRequest,根據上下文生成符合 xDS 接口規范的 DiscoveryResponse,然后返回給 Envoy。
Discovery Server 業務處理關鍵代碼片段
下面是 Discovery Server 的關鍵代碼片段和對應的業務邏輯注解,為方便閱讀,代碼中只保留了邏輯主干,去掉了一些不重要的細節。
處理 xDS 請求和推送的關鍵代碼
該部分關鍵代碼位于 istio.io/istio/pilot/pkg/proxy/envoy/v2/ads.go 文件的 StreamAggregatedResources 方法中。StreamAggregatedResources 方法被注冊為 gRPC Server 的 handler,對于每一個客戶端連接,gRPC Server 會啟動一個 goroutine 來進行處理。
代碼中主要包含以下業務邏輯:
從 gRPC 連接中接收來自 Envoy 的 xDS 請求,并放到一個 channel reqChannel 中。
從 reqChannel 中接收 xDS 請求,根據 xDS 請求的類型構造響應并發送給 Envoy。
從 connection 的 pushChannel 中接收 Service 或者 Config 變化后的通知,構造 xDS 響應消息,將變化內容推送到 Envoy 端。
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
......
// 創建一個 goroutine 來接收來自 Envoy 的 xDS 請求,并將請求放到 reqChannel 中
con := newXdsConnection(peerAddr, stream)
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
go receiveThread(con, reqChannel, receiveError)
......
for {
select{
// 從 reqChannel 接收 Envoy 端主動發起的 xDS 請求
case discReq, ok := -reqChannel:
// 根據請求的類型構造相應的 xDS Response 并發送到 Envoy 端
switch discReq.TypeUrl {
case ClusterType:
err := s.pushCds(con, s.globalPushContext(), versionInfo())
case ListenerType:
err := s.pushLds(con, s.globalPushContext(), versionInfo())
case RouteType:
err := s.pushRoute(con, s.globalPushContext(), versionInfo())
case EndpointType:
err := s.pushEds(s.globalPushContext(), con, versionInfo(), nil)
}
// 從 PushChannel 接收 Service 或者 Config 變化后的通知
case pushEv := -con.pushChannel:
// 將變化內容推送到 Envoy 端
err := s.pushConnection(con, pushEv)
}
}
}
處理服務和配置變化的關鍵代碼
該部分關鍵代碼位于 istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go 文件中,用于監聽服務和配置變化消息,并將變化消息合并后通過 Channel 發送給前面提到的 StreamAggregatedResources 方法進行處理。
ConfigUpdate 是處理服務和配置變化的回調函數,service controller 和 config controller 在發生變化時會調用該方法通知 Discovery Server。
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) { inboundConfigUpdates.Increment()
// 服務或配置變化后,將一個 PushRequest 發送到 pushChannel 中
s.pushChannel - req
}
在 debounce 方法中將連續發生的 PushRequest 進行合并,如果一段時間內沒有收到新的 PushRequest,再發起推送;以避免由于服務和配置頻繁變化給系統帶來較大壓力。
// The debounce helper function is implemented to enable mocking
func debounce(ch chan *model.PushRequest, stopCh -chan struct{}, pushFn func(req *model.PushRequest)) {
......
pushWorker := func() { eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
// it has been too long or quiet enough
// 一段時間內沒有收到新的 PushRequest,再發起推送
if eventDelay = DebounceMax || quietTime = DebounceAfter {
if req != nil {
pushCounter++
adsLog.Infof(Push debounce stable[%d] %d: %v since last change, %v since last push, full=%v ,
pushCounter, debouncedEvents,
quietTime, eventDelay, req.Full)
free = false
go push(req)
req = nil
debouncedEvents = 0
}
} else { timeChan = time.After(DebounceAfter - quietTime)
}
}
for {
select {
......
case r := -ch:
lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 { timeChan = time.After(DebounceAfter)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++
// 合并連續發生的多個 PushRequest
req = req.Merge(r)
case -timeChan:
if free { pushWorker()
}
case -stopCh:
return
}
}
}
“Istio Pilot 代碼是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!