共計 9729 個字符,預計需要花費 25 分鐘才能閱讀完成。
這篇文章主要講解了“proxy 內(nèi)部的運行邏輯是什么”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“proxy 內(nèi)部的運行邏輯是什么”吧!
linkerd2 介紹
Linkerd 由控制平面和數(shù)據(jù)平面組成:
控制平面是在所屬的 Kubernetes 命名空間(linkerd 默認情況下)中運行的一組服務(wù),這些服務(wù)可以完成匯聚遙測數(shù)據(jù),提供面向用戶的 API,并向數(shù)據(jù)平面代理提供控制數(shù)據(jù)等,它們共同驅(qū)動數(shù)據(jù)平面。
數(shù)據(jù)平面用 Rust 編寫的輕量級代理,該代理安裝在服務(wù)的每個 pod 中,并成為數(shù)據(jù)平面的一部分,它接收 Pod 的所有接入流量,并通過 initContainer 配置 iptables 正確轉(zhuǎn)發(fā)流量的攔截所有傳出流量,因為它是附加工具,并且攔截服務(wù)的所有傳入和傳出流量,所以不需要更改代碼,甚至可以將其添加到正在運行的服務(wù)中。
借用官方的圖:
proxy 由 rust 開發(fā)完成,其內(nèi)部的異步運行時采用了 Tokio 框架,服務(wù)組件用到了 tower。
本文主要關(guān)注 proxy 與 destination 組件交互相關(guān)的整體邏輯,分析 proxy 內(nèi)部的運行邏輯。
流程分析初始化
proxy 啟動后:
app::init 初始化配置
app::Main::new 創(chuàng)建主邏輯 main,
main.run_until 內(nèi)新加一任務(wù) ProxyParts::build_proxy_task。
在 ProxyParts::build_proxy_task 中會進行一系列的初始化工作,此處只關(guān)注 dst_svc,其創(chuàng)建代碼為:
dst_svc = svc::stack(connect::svc(keepalive))
.push(tls::client::layer(local_identity.clone()))
.push_timeout(config.control_connect_timeout)
.push(control::client::layer())
.push(control::resolve::layer(dns_resolver.clone()))
.push(reconnect::layer({ let backoff = config.control_backoff.clone();
move |_| Ok(backoff.stream())
}))
.push(http_metrics::layer:: _, classify::Response ( ctl_http_metrics.clone(),
))
.push(proxy::grpc::req_body_as_payload::layer().per_make())
.push(control::add_origin::layer())
.push_buffer_pending(
config.destination_buffer_capacity,
config.control_dispatch_timeout,
)
.into_inner()
.make(config.destination_addr.clone())
dst_svc 一共有 2 處引用,一是 crate::resolve::Resolver 的創(chuàng)建會涉及;另一個就是 ProfilesClient 的創(chuàng)建。
Resolver
api_resolve::Resolve::new(dst_svc.clone()) 創(chuàng)建 resolver 對象
調(diào)用 outbound::resolve 創(chuàng)建 map_endpoint::Resolve 類型對象,并當做參數(shù) resolve 傳入 outbound::spawn 函數(shù)開啟出口線程
在 outbound::spawn 中,resolve 被用于創(chuàng)建負載均衡控制層,并用于后續(xù)路由控制:
let balancer_layer = svc::layers()
.push_spawn_ready()
.push(discover::Layer::new(
DISCOVER_UPDATE_BUFFER_CAPACITY,
resolve,
))
.push(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY));
在 discover::Layer::layer 中:
let from_resolve = FromResolve::new(self.resolve.clone());
let make_discover = MakeEndpoint::new(make_endpoint, from_resolve);
Buffer::new(self.capacity, make_discover)
Profiles
在 ProfilesClient::new 中調(diào)用 api::client::Destination::new(dst_svc) 創(chuàng)建 grpc 的 client 端并存于成員變量 service
接著 profiles_client 對象會被用于 inbound 和 outbound 的創(chuàng)建(省略無關(guān)代碼):
let dst_stack = svc::stack(...)...
.push(profiles::router::layer(
profile_suffixes,
profiles_client,
dst_route_stack,
))
...
其中 profiles::router::layer 會創(chuàng)建一個 Layer 對象,并將 profiles_client 賦予 get_routes 成員。然后在 service 方法中,會調(diào)到 Layer::layer 方法,里面會創(chuàng)建一個 MakeSvc 對象,其 get_routes 成員的值即為 profiles_client。
運行
新的連接過來時,從 listen 拿到連接對象后,會交給 linkerd_proxy::transport::tls::accept::AcceptTls 的 call,然后是 linkerd2_proxy::proxy::server::Server 的 call,并最終分別調(diào)用 linkerd2_proxy_http::balance::MakeSvc::call 和 linkerd2_proxy_http::profiles::router::MakeSvc::call 方法。
balance
在 linkerd2_proxy_http::balance::MakeSvc::call 中:
調(diào)用 inner.call(target),此處的 inner 即是前面 Buffer::new 的結(jié)果。
生成一個新的 linkerd2_proxy_http::balance::MakeSvc 對象,當做 Future 返回
先看 inner.call。它內(nèi)部經(jīng)過層層調(diào)用,依次觸發(fā) Buffer、MakeEndpoint、FromResolve 等結(jié)構(gòu)的 call 方法,最終會觸發(fā)最開始創(chuàng)建的 resolve.resolve(target),其內(nèi)部調(diào)用 api_resolve::Resolve::call。
在 api_resolve::Resolve::call 中:
fn call(mut self, target: T) - Self::Future { let path = target.to_string();
trace!(resolve {:?} , path);
self.service
// GRPC 請求,獲取 k8s 的 endpoint
.get(grpc::Request::new(api::GetDestination {
path,
scheme: self.scheme.clone(),
context_token: self.context_token.clone(),
}))
.map(|rsp| { debug!(metadata = ?rsp.metadata());
// 拿到結(jié)果 stream
Resolution { inner: rsp.into_inner(),
}
})
}
將返回的 Resolution 再次放入 MakeSvc 中,然后看其 poll:
fn poll(mut self) - Poll Self::Item, Self::Error {
// 這個 poll 會依次調(diào)用:
// linkerd2_proxy_api_resolve::resolve::Resolution::poll
// linkerd2_proxy_discover::from_resolve::DiscoverFuture::poll
// linkerd2_proxy_discover::make_endpoint::DiscoverFuture::poll
// 最終獲得 Poll Change SocketAddr, Endpoint
let discover = try_ready!(self.inner.poll());
let instrument = PendingUntilFirstData::default();
let loaded = PeakEwmaDiscover::new(discover, self.default_rtt, self.decay, instrument);
let balance = Balance::new(loaded, self.rng.clone());
Ok(Async::Ready(balance))
}
最終返回 service Balance。
當具體請求過來后,先會判斷 Balance::poll_ready:
fn poll_ready(mut self) - Poll (), Self::Error {
// 獲取 Update Endpoint
// 將 Remove 的從 self.ready_services 中刪掉
// 將 Insert 的構(gòu)造 UnreadyService 結(jié)構(gòu)加到 self.unready_services
self.poll_discover()?;
// 對 UnreadyService,調(diào)用其 poll,內(nèi)部會調(diào)用到 svc 的 poll_ready 判斷 endpoint 是否可用
// 可用時,將其加入 self.ready_services
self.poll_unready();
loop { if let Some(index) = self.next_ready_index {
// 找到對應(yīng)的 endpoint,可用則返回
if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) { return Ok(Async::Ready(()));
}
}
// 選擇負載比較低的 endpoint
self.next_ready_index = self.p2c_next_ready_index();
if self.next_ready_index.is_none() {
//
return Ok(Async::NotReady);
}
}
}
就緒后,對請求 req 調(diào)用 call:
fn call(mut self, request: Req) - Self::Future {
// 找到下一個可用的 svc,并將其從 ready_services 中刪除
let index = self.next_ready_index.take().expect( not ready
let (key, mut svc) = self
.ready_services
.swap_remove_index(index)
.expect( invalid ready index
// 將請求轉(zhuǎn)過去
let fut = svc.call(request);
// 加到 unready
self.push_unready(key, svc);
fut.map_err(Into::into)
}
profiles
在 linkerd2_proxy_http::profiles::router::MakeSvc::call 中:
// Initiate a stream to get route and dst_override updates for this
// destination.
let route_stream = match target.get_destination() { Some(ref dst) = { if self.suffixes.iter().any(|s| s.contains(dst.name())) { debug!( fetching routes for {:?} , dst);
self.get_routes.get_routes(dst)
} else { debug!( skipping route discovery for dst={:?} , dst);
None
}
}
None = {
debug!( no destination for routes
None
}
};
經(jīng)過若干判斷后,會調(diào)用 ProfilesClient::get_routes 并將結(jié)果存于 route_stream。
進入 get_routes:
fn get_routes(self, dst: NameAddr) - Option Self::Stream {
// 創(chuàng)建通道
let (tx, rx) = mpsc::channel(1);
// This oneshot allows the daemon to be notified when the Self::Stream
// is dropped.
let (hangup_tx, hangup_rx) = oneshot::channel();
// 創(chuàng)建 Daemon 對象(Future 任務(wù)) let daemon = Daemon {
tx,
hangup: hangup_rx,
dst: format!({} , dst),
state: State::Disconnected,
service: self.service.clone(),
backoff: self.backoff,
context_token: self.context_token.clone(),
};
// 調(diào)用 Daemon::poll
let spawn = DefaultExecutor::current().spawn(Box::new(daemon.map_err(|_| ())));
// 將通道接收端傳出
spawn.ok().map(|_| Rx {
rx,
_hangup: hangup_tx,
})
}
接著看 Daemon::poll:
fn poll(mut self) - Poll Self::Item, Self::Error {
loop {
// 遍歷 state 成員狀態(tài)
self.state = match self.state {
// 未連接時
State::Disconnected = { match self.service.poll_ready() { Ok(Async::NotReady) = return Ok(Async::NotReady),
Ok(Async::Ready(())) = {}
Err(err) = {
error!( profile service unexpected error (dst = {}): {:?} ,
self.dst, err,
);
return Ok(Async::Ready(()));
}
};
// 構(gòu)造 grpc 請求
let req = api::GetDestination { scheme: k8s .to_owned(),
path: self.dst.clone(),
context_token: self.context_token.clone(),
};
debug!(getting profile: {:?} , req);
// 獲取請求任務(wù)
let rspf = self.service.get_profile(grpc::Request::new(req));
State::Waiting(rspf)
}
// 正在請求時,從請求中獲取回復
State::Waiting(ref mut f) = match f.poll() { Ok(Async::NotReady) = return Ok(Async::NotReady),
// 正常回復
Ok(Async::Ready(rsp)) = {
trace!( response received
// 流式回復
State::Streaming(rsp.into_inner())
}
Err(e) = { warn!( error fetching profile for {}: {:?} , self.dst, e);
State::Backoff(Delay::new(clock::now() + self.backoff))
}
},
// 接收回復
State::Streaming(ref mut s) = {
// 處理回復流
// 注意此處,參數(shù) 1 是 get_profile 請求的回復流, // 參數(shù) 2 是之前創(chuàng)建的通道發(fā)送端
match Self::proxy_stream(s, mut self.tx, mut self.hangup) { Async::NotReady = return Ok(Async::NotReady),
Async::Ready(StreamState::SendLost) = return Ok(().into()),
Async::Ready(StreamState::RecvDone) = { State::Backoff(Delay::new(clock::now() + self.backoff))
}
}
}
// 異常,結(jié)束請求
State::Backoff(ref mut f) = match f.poll() { Ok(Async::NotReady) = return Ok(Async::NotReady),
Err(_) | Ok(Async::Ready(())) = State::Disconnected,
},
};
}
}
接著 proxy_stream:
fn proxy_stream(
rx: mut grpc::Streaming api::DestinationProfile, T::ResponseBody ,
tx: mut mpsc::Sender profiles::Routes ,
hangup: mut oneshot::Receiver Never ,
) - Async StreamState {
loop {
// 發(fā)送端是否就緒
match tx.poll_ready() { Ok(Async::NotReady) = return Async::NotReady,
Ok(Async::Ready(())) = {}
Err(_) = return StreamState::SendLost.into(),
}
// 從 grpc stream 中取得一條數(shù)據(jù)
match rx.poll() { Ok(Async::NotReady) = match hangup.poll() { Ok(Async::Ready(never)) = match never {}, // unreachable!
Ok(Async::NotReady) = {
// We are now scheduled to be notified if the hangup tx
// is dropped.
return Async::NotReady;
}
Err(_) = {
// Hangup tx has been dropped.
debug!( profile stream cancelled
return StreamState::SendLost.into();
}
},
Ok(Async::Ready(None)) = return StreamState::RecvDone.into(),
// 正確取得 profile 結(jié)構(gòu)
Ok(Async::Ready(Some(profile))) = { debug!( profile received: {:?} , profile);
// 解析數(shù)據(jù)
let retry_budget = profile.retry_budget.and_then(convert_retry_budget);
let routes = profile
.routes
.into_iter()
.filter_map(move |orig| convert_route(orig, retry_budget.as_ref()))
.collect();
let dst_overrides = profile
.dst_overrides
.into_iter()
.filter_map(convert_dst_override)
.collect();
// 構(gòu)造 profiles::Routes 結(jié)構(gòu)并推到發(fā)送端
match tx.start_send(profiles::Routes {
routes,
dst_overrides,
}) { Ok(AsyncSink::Ready) = {} // continue
Ok(AsyncSink::NotReady(_)) = {
info!( dropping profile update due to a full buffer
// This must have been because another task stole
// our tx slot? It seems pretty unlikely, but possible?
return Async::NotReady;
}
Err(_) = { return StreamState::SendLost.into();
}
}
}
Err(e) = { warn!( profile stream failed: {:?} , e);
return StreamState::RecvDone.into();
}
}
}
}
回到 MakeSvc::call 方法,前面創(chuàng)建的 route_stream 會被用于創(chuàng)建一個 linkerd2_proxy::proxy::http::profiles::router::Service 任務(wù)對象,并在其 poll_ready 方法中通過 poll_route_stream 從 route_steam 獲取 profiles::Routes 并調(diào)用 update_routes 創(chuàng)建具體可用的路由規(guī)則 linkerd2_router::Router,至此,路由規(guī)則已建好,就等具體的請求過來然后在 call 中調(diào)用 linkerd2_router::call 進行對請求的路由判斷。
圖示 profile
感謝各位的閱讀,以上就是“proxy 內(nèi)部的運行邏輯是什么”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對 proxy 內(nèi)部的運行邏輯是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!