久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

proxy內(nèi)部的運行邏輯是什么

182次閱讀
沒有評論

共計 9729 個字符,預計需要花費 25 分鐘才能閱讀完成。

這篇文章主要講解了“proxy 內(nèi)部的運行邏輯是什么”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“proxy 內(nèi)部的運行邏輯是什么”吧!

linkerd2 介紹

Linkerd 由控制平面和數(shù)據(jù)平面組成:

控制平面是在所屬的 Kubernetes 命名空間(linkerd 默認情況下)中運行的一組服務(wù),這些服務(wù)可以完成匯聚遙測數(shù)據(jù),提供面向用戶的 API,并向數(shù)據(jù)平面代理提供控制數(shù)據(jù)等,它們共同驅(qū)動數(shù)據(jù)平面。

數(shù)據(jù)平面用 Rust 編寫的輕量級代理,該代理安裝在服務(wù)的每個 pod 中,并成為數(shù)據(jù)平面的一部分,它接收 Pod 的所有接入流量,并通過 initContainer 配置 iptables 正確轉(zhuǎn)發(fā)流量的攔截所有傳出流量,因為它是附加工具,并且攔截服務(wù)的所有傳入和傳出流量,所以不需要更改代碼,甚至可以將其添加到正在運行的服務(wù)中。

借用官方的圖:

proxy 由 rust 開發(fā)完成,其內(nèi)部的異步運行時采用了 Tokio 框架,服務(wù)組件用到了 tower。

本文主要關(guān)注 proxy 與 destination 組件交互相關(guān)的整體邏輯,分析 proxy 內(nèi)部的運行邏輯。

流程分析初始化

proxy 啟動后:

app::init 初始化配置

app::Main::new 創(chuàng)建主邏輯 main,

main.run_until 內(nèi)新加一任務(wù) ProxyParts::build_proxy_task。

在 ProxyParts::build_proxy_task 中會進行一系列的初始化工作,此處只關(guān)注 dst_svc,其創(chuàng)建代碼為:

 dst_svc = svc::stack(connect::svc(keepalive))
 .push(tls::client::layer(local_identity.clone()))
 .push_timeout(config.control_connect_timeout)
 .push(control::client::layer())
 .push(control::resolve::layer(dns_resolver.clone()))
 .push(reconnect::layer({ let backoff = config.control_backoff.clone();
 move |_| Ok(backoff.stream())
 }))
 .push(http_metrics::layer:: _, classify::Response ( ctl_http_metrics.clone(),
 ))
 .push(proxy::grpc::req_body_as_payload::layer().per_make())
 .push(control::add_origin::layer())
 .push_buffer_pending(
 config.destination_buffer_capacity,
 config.control_dispatch_timeout,
 )
 .into_inner()
 .make(config.destination_addr.clone())

dst_svc 一共有 2 處引用,一是 crate::resolve::Resolver 的創(chuàng)建會涉及;另一個就是 ProfilesClient 的創(chuàng)建。

Resolver

api_resolve::Resolve::new(dst_svc.clone()) 創(chuàng)建 resolver 對象

調(diào)用 outbound::resolve 創(chuàng)建 map_endpoint::Resolve 類型對象,并當做參數(shù) resolve 傳入 outbound::spawn 函數(shù)開啟出口線程

在 outbound::spawn 中,resolve 被用于創(chuàng)建負載均衡控制層,并用于后續(xù)路由控制:

let balancer_layer = svc::layers()
 .push_spawn_ready()
 .push(discover::Layer::new(
 DISCOVER_UPDATE_BUFFER_CAPACITY,
 resolve,
 ))
 .push(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY));

在 discover::Layer::layer 中:

let from_resolve = FromResolve::new(self.resolve.clone());
let make_discover = MakeEndpoint::new(make_endpoint, from_resolve);
Buffer::new(self.capacity, make_discover)

Profiles

在 ProfilesClient::new 中調(diào)用 api::client::Destination::new(dst_svc) 創(chuàng)建 grpc 的 client 端并存于成員變量 service

接著 profiles_client 對象會被用于 inbound 和 outbound 的創(chuàng)建(省略無關(guān)代碼):

 let dst_stack = svc::stack(...)...
 .push(profiles::router::layer(
 profile_suffixes,
 profiles_client,
 dst_route_stack,
 ))
 ...

其中 profiles::router::layer 會創(chuàng)建一個 Layer 對象,并將 profiles_client 賦予 get_routes 成員。然后在 service 方法中,會調(diào)到 Layer::layer 方法,里面會創(chuàng)建一個 MakeSvc 對象,其 get_routes 成員的值即為 profiles_client。

運行

新的連接過來時,從 listen 拿到連接對象后,會交給 linkerd_proxy::transport::tls::accept::AcceptTls 的 call,然后是 linkerd2_proxy::proxy::server::Server 的 call,并最終分別調(diào)用 linkerd2_proxy_http::balance::MakeSvc::call 和 linkerd2_proxy_http::profiles::router::MakeSvc::call 方法。

balance

在 linkerd2_proxy_http::balance::MakeSvc::call 中:

調(diào)用 inner.call(target),此處的 inner 即是前面 Buffer::new 的結(jié)果。

生成一個新的 linkerd2_proxy_http::balance::MakeSvc 對象,當做 Future 返回

先看 inner.call。它內(nèi)部經(jīng)過層層調(diào)用,依次觸發(fā) Buffer、MakeEndpoint、FromResolve 等結(jié)構(gòu)的 call 方法,最終會觸發(fā)最開始創(chuàng)建的 resolve.resolve(target),其內(nèi)部調(diào)用 api_resolve::Resolve::call。

在 api_resolve::Resolve::call 中:

 fn call(mut self, target: T) -  Self::Future { let path = target.to_string();
 trace!(resolve {:?} , path);
 self.service
 // GRPC 請求,獲取 k8s 的 endpoint
 .get(grpc::Request::new(api::GetDestination {
 path,
 scheme: self.scheme.clone(),
 context_token: self.context_token.clone(),
 }))
 .map(|rsp| { debug!(metadata = ?rsp.metadata());
 //  拿到結(jié)果 stream
 Resolution { inner: rsp.into_inner(),
 }
 })
 }

將返回的 Resolution 再次放入 MakeSvc 中,然后看其 poll:

 fn poll(mut self) -  Poll Self::Item, Self::Error  {
 //  這個 poll 會依次調(diào)用:
 // linkerd2_proxy_api_resolve::resolve::Resolution::poll
 // linkerd2_proxy_discover::from_resolve::DiscoverFuture::poll
 // linkerd2_proxy_discover::make_endpoint::DiscoverFuture::poll
 //  最終獲得 Poll Change SocketAddr, Endpoint  
 let discover = try_ready!(self.inner.poll());
 let instrument = PendingUntilFirstData::default();
 let loaded = PeakEwmaDiscover::new(discover, self.default_rtt, self.decay, instrument);
 let balance = Balance::new(loaded, self.rng.clone());
 Ok(Async::Ready(balance))
 }

最終返回 service Balance。

當具體請求過來后,先會判斷 Balance::poll_ready:

 fn poll_ready(mut self) -  Poll (), Self::Error  {
 //  獲取 Update Endpoint 
 //  將 Remove 的從 self.ready_services 中刪掉
 //  將 Insert 的構(gòu)造 UnreadyService 結(jié)構(gòu)加到 self.unready_services
 self.poll_discover()?;
 //  對 UnreadyService,調(diào)用其 poll,內(nèi)部會調(diào)用到 svc 的 poll_ready 判斷 endpoint 是否可用
 //  可用時,將其加入 self.ready_services
 self.poll_unready();
 
 loop { if let Some(index) = self.next_ready_index {
 //  找到對應(yīng)的 endpoint,可用則返回
 if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) { return Ok(Async::Ready(()));
 }
 }
 //  選擇負載比較低的 endpoint
 self.next_ready_index = self.p2c_next_ready_index();
 if self.next_ready_index.is_none() {
 // 
 return Ok(Async::NotReady);
 }
 }
 }

就緒后,對請求 req 調(diào)用 call:

 fn call(mut self, request: Req) -  Self::Future {
 //  找到下一個可用的 svc,并將其從 ready_services 中刪除
 let index = self.next_ready_index.take().expect( not ready 
 let (key, mut svc) = self
 .ready_services
 .swap_remove_index(index)
 .expect( invalid ready index 
 //  將請求轉(zhuǎn)過去
 let fut = svc.call(request);
 //  加到 unready
 self.push_unready(key, svc);
 fut.map_err(Into::into)
 }

profiles

在 linkerd2_proxy_http::profiles::router::MakeSvc::call 中:

 // Initiate a stream to get route and dst_override updates for this
 // destination.
 let route_stream = match target.get_destination() { Some(ref dst) =  { if self.suffixes.iter().any(|s| s.contains(dst.name())) { debug!( fetching routes for {:?} , dst);
 self.get_routes.get_routes(dst)
 } else { debug!( skipping route discovery for dst={:?} , dst);
 None
 }
 }
 None =  {
 debug!( no destination for routes 
 None
 }
 };

經(jīng)過若干判斷后,會調(diào)用 ProfilesClient::get_routes 并將結(jié)果存于 route_stream。

進入 get_routes:

 fn get_routes(self, dst:  NameAddr) -  Option Self::Stream  {
 //  創(chuàng)建通道
 let (tx, rx) = mpsc::channel(1);
 // This oneshot allows the daemon to be notified when the Self::Stream
 // is dropped.
 let (hangup_tx, hangup_rx) = oneshot::channel();
 //  創(chuàng)建 Daemon 對象(Future 任務(wù)) let daemon = Daemon {
 tx,
 hangup: hangup_rx,
 dst: format!({} , dst),
 state: State::Disconnected,
 service: self.service.clone(),
 backoff: self.backoff,
 context_token: self.context_token.clone(),
 };
 //  調(diào)用 Daemon::poll
 let spawn = DefaultExecutor::current().spawn(Box::new(daemon.map_err(|_| ())));
 //  將通道接收端傳出
 spawn.ok().map(|_| Rx {
 rx,
 _hangup: hangup_tx,
 })
 }

接著看 Daemon::poll:

 fn poll(mut self) -  Poll Self::Item, Self::Error  {
 loop {
 //  遍歷 state 成員狀態(tài)
 self.state = match self.state {
 //  未連接時
 State::Disconnected =  { match self.service.poll_ready() { Ok(Async::NotReady) =  return Ok(Async::NotReady),
 Ok(Async::Ready(())) =  {}
 Err(err) =  {
 error!(  profile service unexpected error (dst = {}): {:?} ,
 self.dst, err,
 );
 return Ok(Async::Ready(()));
 }
 };
 //  構(gòu)造 grpc 請求
 let req = api::GetDestination { scheme:  k8s .to_owned(),
 path: self.dst.clone(),
 context_token: self.context_token.clone(),
 };
 debug!(getting profile: {:?} , req);
 //  獲取請求任務(wù)
 let rspf = self.service.get_profile(grpc::Request::new(req));
 State::Waiting(rspf)
 }
 //  正在請求時,從請求中獲取回復
 State::Waiting(ref mut f) =  match f.poll() { Ok(Async::NotReady) =  return Ok(Async::NotReady),
 //  正常回復
 Ok(Async::Ready(rsp)) =  {
 trace!( response received 
 //  流式回復
 State::Streaming(rsp.into_inner())
 }
 Err(e) =  { warn!( error fetching profile for {}: {:?} , self.dst, e);
 State::Backoff(Delay::new(clock::now() + self.backoff))
 }
 },
 //  接收回復
 State::Streaming(ref mut s) =  {
 //  處理回復流
 //  注意此處,參數(shù) 1 是 get_profile 請求的回復流, //  參數(shù) 2 是之前創(chuàng)建的通道發(fā)送端
 match Self::proxy_stream(s,  mut self.tx,  mut self.hangup) { Async::NotReady =  return Ok(Async::NotReady),
 Async::Ready(StreamState::SendLost) =  return Ok(().into()),
 Async::Ready(StreamState::RecvDone) =  { State::Backoff(Delay::new(clock::now() + self.backoff))
 }
 }
 }
 //  異常,結(jié)束請求
 State::Backoff(ref mut f) =  match f.poll() { Ok(Async::NotReady) =  return Ok(Async::NotReady),
 Err(_) | Ok(Async::Ready(())) =  State::Disconnected,
 },
 };
 }
 }

接著 proxy_stream:

 fn proxy_stream(
 rx:  mut grpc::Streaming api::DestinationProfile, T::ResponseBody ,
 tx:  mut mpsc::Sender profiles::Routes ,
 hangup:  mut oneshot::Receiver Never ,
 ) -  Async StreamState  {
 loop {
 //  發(fā)送端是否就緒
 match tx.poll_ready() { Ok(Async::NotReady) =  return Async::NotReady,
 Ok(Async::Ready(())) =  {}
 Err(_) =  return StreamState::SendLost.into(),
 }
 //  從 grpc stream 中取得一條數(shù)據(jù)
 match rx.poll() { Ok(Async::NotReady) =  match hangup.poll() { Ok(Async::Ready(never)) =  match never {}, // unreachable!
 Ok(Async::NotReady) =  {
 // We are now scheduled to be notified if the hangup tx
 // is dropped.
 return Async::NotReady;
 }
 Err(_) =  {
 // Hangup tx has been dropped.
 debug!( profile stream cancelled 
 return StreamState::SendLost.into();
 }
 },
 Ok(Async::Ready(None)) =  return StreamState::RecvDone.into(),
 //  正確取得 profile 結(jié)構(gòu)
 Ok(Async::Ready(Some(profile))) =  { debug!( profile received: {:?} , profile);
 //  解析數(shù)據(jù)
 let retry_budget = profile.retry_budget.and_then(convert_retry_budget);
 let routes = profile
 .routes
 .into_iter()
 .filter_map(move |orig| convert_route(orig, retry_budget.as_ref()))
 .collect();
 let dst_overrides = profile
 .dst_overrides
 .into_iter()
 .filter_map(convert_dst_override)
 .collect();
 //  構(gòu)造 profiles::Routes 結(jié)構(gòu)并推到發(fā)送端
 match tx.start_send(profiles::Routes {
 routes,
 dst_overrides,
 }) { Ok(AsyncSink::Ready) =  {} // continue
 Ok(AsyncSink::NotReady(_)) =  {
 info!( dropping profile update due to a full buffer 
 // This must have been because another task stole
 // our tx slot? It seems pretty unlikely, but possible?
 return Async::NotReady;
 }
 Err(_) =  { return StreamState::SendLost.into();
 }
 }
 }
 Err(e) =  { warn!( profile stream failed: {:?} , e);
 return StreamState::RecvDone.into();
 }
 }
 }
 }

回到 MakeSvc::call 方法,前面創(chuàng)建的 route_stream 會被用于創(chuàng)建一個 linkerd2_proxy::proxy::http::profiles::router::Service 任務(wù)對象,并在其 poll_ready 方法中通過 poll_route_stream 從 route_steam 獲取 profiles::Routes 并調(diào)用 update_routes 創(chuàng)建具體可用的路由規(guī)則 linkerd2_router::Router,至此,路由規(guī)則已建好,就等具體的請求過來然后在 call 中調(diào)用 linkerd2_router::call 進行對請求的路由判斷。

圖示 profile

感謝各位的閱讀,以上就是“proxy 內(nèi)部的運行邏輯是什么”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對 proxy 內(nèi)部的運行邏輯是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-16發(fā)表,共計9729字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 天等县| 巴林左旗| 东乡族自治县| 元朗区| 江山市| 丹阳市| 靖州| 岳普湖县| 黄梅县| 阳城县| 比如县| 资源县| 威海市| 平顶山市| 鲁甸县| 都昌县| 文成县| 汾西县| 瑞昌市| 鹤山市| 栾川县| 东丽区| 乡宁县| 平武县| 遵义市| 黔江区| 连州市| 丹凤县| 温宿县| 西和县| 花垣县| 库车县| 新泰市| 台安县| 台湾省| 中阳县| 宜阳县| 台前县| 福贡县| 塔河县| 兴化市|