久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

TARS C++客戶端是什么

共計(jì) 22245 個(gè)字符,預(yù)計(jì)需要花費(fèi) 56 分鐘才能閱讀完成。

本篇內(nèi)容介紹了“TARS C++ 客戶端是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓丸趣 TV 小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

什么是 TARS

TARS 是騰訊使用十年的微服務(wù)開(kāi)發(fā)框架,目前支持 C ++、Java、PHP、Node.js、Go 語(yǔ)言。該開(kāi)源項(xiàng)目為用戶提供了涉及到開(kāi)發(fā)、運(yùn)維、以及測(cè)試的一整套微服務(wù)平臺(tái) PaaS 解決方案,幫助一個(gè)產(chǎn)品或者服務(wù)快速開(kāi)發(fā)、部署、測(cè)試、上線。目前該框架應(yīng)用在騰訊各大核心業(yè)務(wù),基于該框架部署運(yùn)行的服務(wù)節(jié)點(diǎn)規(guī)模達(dá)到數(shù)十萬(wàn)。TARS 的通信模型中包含客戶端和服務(wù)端。客戶端服務(wù)端之間主要是利用 RPC 進(jìn)行通信。本系列文章分上下兩篇,對(duì) RPC 調(diào)用部分進(jìn)行源碼解析。本文是上篇,我們將以 C ++ 語(yǔ)言為載體,帶大家了解一下 TARS 的客戶端。

初識(shí)客戶端

TARS 的客戶端最重要的類是 Communicator,一個(gè)客戶端只能聲明出一個(gè) Communicator 類實(shí)例,用戶可以通過(guò) CommunicatorPtr Application::getCommunicator() 獲取線程安全的 Communicator 類單例。Communicator 類聚合了兩個(gè)比較重要的類,一個(gè)是 CommunicatorEpoll,負(fù)責(zé)網(wǎng)絡(luò)線程的建立與通過(guò) ObjectProxyFactory 生成 ObjectProxy;另一個(gè)是 ServantProxyFactory,生成不同的 RPC 服務(wù)句柄,即 ServantProxy,用戶通過(guò) ServantProxy 調(diào)用 RPC 服務(wù)。下面簡(jiǎn)單介紹幾個(gè)類的作用。

Communicator

一個(gè) Communicator 實(shí)例就是一個(gè)客戶端,負(fù)責(zé)與服務(wù)端建立連接,生成 RPC 服務(wù)句柄,可以通過(guò) CommunicatorPtr Application::getCommunicator() 獲取 Communicator 實(shí)例,用戶最后不要自己聲明定義新的 Communicator 實(shí)例。

ServantProxy 與 ServantProxyFactory

ServantProxy 就是一個(gè)服務(wù)代理,ServantProxy 可以通過(guò) ServantProxyFactory 工廠類生成,用戶往往通過(guò) Communicator 的 template class T void stringToProxy() 接口間接調(diào)用 ServantProxyFactory 的 ServantPrx::element_type* getServantProxy() 接口以獲取服務(wù)代理,通過(guò)服務(wù)代理 ServantProxy,用戶就可以進(jìn)行 RPC 調(diào)用了。ServantProxy 內(nèi)含多個(gè)服務(wù)實(shí)體 ObjectProxy(詳見(jiàn)下文第 4 小點(diǎn)),能夠幫助用戶在同一個(gè)服務(wù)代理內(nèi)進(jìn)行負(fù)載均衡。

CommunicatorEpoll

CommunicatorEpoll 類代表客戶端的網(wǎng)絡(luò)模塊,內(nèi)含 TC_Epoller 作為 IO 復(fù)用,能夠同時(shí)處理不同主調(diào)線程(caller 線程)的多個(gè)請(qǐng)求。CommunicatorEpoll 內(nèi)含服務(wù)實(shí)體工廠類 ObjectProxyFactory(詳見(jiàn)下文第 4 小點(diǎn)),意味著在同一網(wǎng)絡(luò)線程中,能夠產(chǎn)生不同服務(wù)的實(shí)體,能夠完成不同的 RPC 服務(wù)調(diào)用。CommunicatorEpoll 還聚合了異步調(diào)用處理線程 AsyncProcThread,負(fù)責(zé)接收到異步的響應(yīng)包之后,將響應(yīng)包交給該線程處理。

ObjectProxy 與 ObjectProxyFactory

ObjectProxy 類是一個(gè)服務(wù)實(shí)體,注意與 ServantProxy 類是一個(gè)服務(wù)代理相區(qū)別,前者表示一個(gè)網(wǎng)絡(luò)線程上的某個(gè)服務(wù)實(shí)體 A,后者表示對(duì)所有網(wǎng)絡(luò)線程上的某服務(wù)實(shí)體 A 的總代理,更詳細(xì)的介紹可見(jiàn)下文。ObjectProxy 通過(guò) ObjectProxyFactory 生成,而 ObjectProxyFactory 類的實(shí)例是 CommunicatorEpoll 的成員變量,意味著一個(gè)網(wǎng)絡(luò)線程 CommunicatorEpoll 能夠產(chǎn)生各種各樣的服務(wù)實(shí)體 ObjectProxy,發(fā)起不同的 RPC 服務(wù)。ObjectProxy 通過(guò) AdapterProxy 來(lái)管理對(duì)服務(wù)端的連接。好了,介紹完所有的類之后,先通過(guò)類圖理一理他們之間的關(guān)系,這個(gè)類圖在之后的文章中將會(huì)再次出現(xiàn)。

TARS 的客戶端最重要的類是 Communicator,一個(gè)客戶端只能聲明出一個(gè) Communicator 類實(shí)例,用戶可以通過(guò) CommunicatorPtr Application::getCommunicator() 獲取線程安全的 Communicator 類單例。Communicator 類聚合了兩個(gè)比較重要的類,一個(gè)是 CommunicatorEpoll,負(fù)責(zé)網(wǎng)絡(luò)線程的建立與通過(guò) ObjectProxyFactory 生成 ObjectProxy;另一個(gè)是 ServantProxyFactory,生成不同的 RPC 服務(wù)句柄,即 ServantProxy,用戶通過(guò) ServantProxy 調(diào)用 RPC 服務(wù)。根據(jù)用戶配置,Communicator 擁有 n 個(gè)網(wǎng)絡(luò)線程,即 n 個(gè) CommunicatorEpoll。每個(gè) CommunicatorEpoll 擁有一個(gè) ObjectProxyFactory 類,每個(gè) ObjectProxyFactory 可以生成一系列的不同服務(wù)的實(shí)體對(duì)象 ObjectProxy,因此,假如 Communicator 擁有兩個(gè) CommunicatorEpoll,并有 foo 與 bar 這兩類不同的服務(wù)實(shí)體對(duì)象,那么如下圖(1-2)所示,每個(gè) CommunicatorEpoll 可以通過(guò) ObjectProxyFactory 創(chuàng)建兩類 ObjectProxy,這是 TARS 客戶端的第一層負(fù)載均衡,每個(gè)線程都可以分擔(dān)所有服務(wù)的 RPC 請(qǐng)求,因此,一個(gè)服務(wù)的阻塞可能會(huì)影響其他服務(wù),因?yàn)榫W(wǎng)絡(luò)線程是多個(gè)服務(wù)實(shí)體 ObjectProxy 所共享的。

Communicator 類下另一個(gè)比較重要的 ServantProxyFactory 類的作用是依據(jù)實(shí)際服務(wù)端的信息(如服務(wù)器的 socket 標(biāo)志)與 Communicator 中客戶端的信息(如網(wǎng)絡(luò)線程數(shù))而生成 ServantProxy 句柄,通過(guò)句柄調(diào)用 RPC 服務(wù)。舉個(gè)例子,如下圖(1-3)所示,Communicator 實(shí)例通過(guò) ServantProxyFactory 成員變量的 getServantProxy() 接口在構(gòu)造 fooServantProxy 句柄的時(shí)候,會(huì)獲取 Communicator 實(shí)例下的所有 CommunicatorEpoll(即 CommunicatorEpoll- 1 與 CommunicatorEpoll-2)中的 fooObjectProxy(即 fooObjectProxy- 1 與 fooObjectProxy-2),并作為構(gòu)造 fooServantProxy 的參數(shù)。Communicator 通過(guò) ServantProxyFactory 能夠獲取 foo 與 bar 這兩類 ServantProxy,ServantProxy 與相應(yīng)的 ObjectProxy 存在相應(yīng)的聚合關(guān)系:

另外,每個(gè) ObjectProxy 都擁有一個(gè) EndpointManager,例如,fooObjectProxy 的 EndpointManager 管理 fooObjectProxy 下面的所有 fooAdapterProxy,每個(gè) AdapterProxy 連接到一個(gè)提供相應(yīng) foo 服務(wù)的服務(wù)端物理機(jī) socket 上。通過(guò) EndpointManager 還可以以不同的負(fù)載均衡方式獲取連接 AdapterProxy。假如 foo 服務(wù)有兩臺(tái)物理機(jī),bar 服務(wù)有一臺(tái)物理機(jī),那么 ObjectProxy,EndpointManager 與 AdapterProxy 關(guān)系如下圖(1-4)所示。上面提到,不同的網(wǎng)絡(luò)線程 CommunicatorEpoll 均可以發(fā)起同一 RPC 請(qǐng)求,對(duì)于同一 RPC 服務(wù),選取不同的 ObjectProxy(或可認(rèn)為選取不同的網(wǎng)絡(luò)線程 CommunicatorEpoll)是第一層的負(fù)載均衡,而對(duì)于同一個(gè)被選中的 ObjectProxy,通過(guò) EndpointManager 選擇不同的 socket 連接 AdapterProxy(假如 ObjectProxy 有大于 1 個(gè)的 AdapterProxy,如圖(1-4)的 fooObjectProxy)是第二層的負(fù)載均衡。

在客戶端進(jìn)行初始化時(shí),必須建立上面介紹的關(guān)系,因此相應(yīng)的類圖如圖(1-5)所示,通過(guò)類圖可以看出各類的關(guān)系,以及初始化需要用到的函數(shù)。

初始化代碼

現(xiàn)在,通過(guò)代碼跟蹤來(lái)看看,在客戶端初始化過(guò)程中,各個(gè)類是如何被初始化出來(lái)并建立上述的架構(gòu)關(guān)系的。在簡(jiǎn)述之前,可以先看看函數(shù)的調(diào)用流程圖,若看不清晰,可以將圖片保存下來(lái),用看圖軟件放大查看,強(qiáng)烈建議結(jié)合文章的代碼解析以 TARS 源碼一起查看,文章后面的所有代碼流程圖均如此。接下來(lái),將會(huì)按照函數(shù)調(diào)用流程圖來(lái)一步一步分析客戶端代理是如何被初始化出來(lái)的:

1. 執(zhí)行 stringToProxy

在客戶端程序中,一開(kāi)始會(huì)執(zhí)行下面的代碼進(jìn)行整個(gè)客戶端代理的初始化:

Communicator comm;
HelloPrx prx;
comm.stringToProxy(TestApp.HelloServer.HelloObj@tcp -h 1.1.1.1 -p 20001  , prx);

先聲明一個(gè) Communicator 變量 comm(其實(shí)不建議這么做)以及一個(gè) ServantProxy 類的指針變量 prx,在此處,服務(wù)為 Hello,因此聲明一個(gè) HelloPrx prx。注意一個(gè)客戶端只能擁有一個(gè) Communicator。為了能夠獲得 RPC 的服務(wù)句柄,我們調(diào)用 Communicator::stringToProxy(),并傳入服務(wù)端的信息與 prx 變量,函數(shù)返回后,prx 就是 RPC 服務(wù)的句柄。進(jìn)入 Communicator::stringToProxy() 函數(shù)中,我們通過(guò) Communicator::getServantProxy() 來(lái)依據(jù) objectName 與 setName 獲取服務(wù)代理 ServantProxy:

/**
*  生成代理
* @param T
* @param objectName
* @param setName  指定 set 調(diào)用的 setid
* @param proxy
template class T  void stringToProxy(const string  objectName, T  proxy,const string  setName=)
 ServantProxy * pServantProxy = getServantProxy(objectName,setName);
 proxy = (typename T::element_type*)(pServantProxy);
}

2. 執(zhí)行 Communicator 的初始化函數(shù)

進(jìn)入 Communicator::getServantProxy(),首先會(huì)執(zhí)行 Communicator::initialize() 來(lái)初始化 Communicator,需要注意一點(diǎn),Communicator:: initialize() 只會(huì)被執(zhí)行一次,下一次執(zhí)行 Communicator::getServantProxy() 將不會(huì)再次執(zhí)行 Communicator:: initialize() 函數(shù):

void Communicator::initialize()
 TC_LockT TC_ThreadRecMutex  lock(*this);
 if (_initialized) // 已經(jīng)被初始化則直接返回
 return;
 ......
}

進(jìn)入 Communicator::initialize() 函數(shù)中,在這里,將會(huì) new 出上文介紹的與 Communicator 密切相關(guān)的類 ServantProxyFactory 與 n 個(gè) CommunicatorEpoll,n 為客戶端的網(wǎng)絡(luò)線程數(shù),最小為 1,最大為 MAX_CLIENT_THREAD_NUM:

void Communicator::initialize()
 ......
 _servantProxyFactory = new ServantProxyFactory(this);
 ......
 for(size_t i = 0; i   _clientThreadNum; ++i)
 { _communicatorEpoll[i] = new CommunicatorEpoll(this, i);
 _communicatorEpoll[i]- start(); // 啟動(dòng)網(wǎng)絡(luò)線程
 }
 ......
}

在 CommunicatorEpoll 的構(gòu)造函數(shù)中,ObjectProxyFactory 被創(chuàng)建出來(lái),這是構(gòu)造圖(1-2)關(guān)系的前提。除此之外,還可以看到獲取相應(yīng)配置,創(chuàng)建并啟動(dòng)若干個(gè)異步回調(diào)后的處理線程。創(chuàng)建完成后,調(diào)用 CommunicatorEpoll::start() 啟動(dòng)網(wǎng)絡(luò)線程。至此,Communicator::initialize() 順利執(zhí)行。通過(guò)下圖回顧上面的過(guò)程:

3. 嘗試獲取 ServantProxy

代碼回到 Communicator::getServantProxy() 中 Communicator::getServantProxy() 會(huì)執(zhí)行 ServantProxyFactory::getServantProxy() 并返回相應(yīng)的服務(wù)代理:

ServantProxy* Communicator::getServantProxy(const string  objectName, const string  setName)
 ……
 return _servantProxyFactory- getServantProxy(objectName,setName);
}

進(jìn)入 ServantProxyFactory::getServantProxy(),首先會(huì)加鎖,從 map string, ServantPrx _servantProxy 中查找目標(biāo),若查找成功直接返回。若查找失敗,TARS 需要構(gòu)造出相應(yīng)的 ServantProxy,ServantProxy 的構(gòu)造需要如圖(1-3)所示的相對(duì)應(yīng)的 ObjectProxy 作為構(gòu)造函數(shù)的參數(shù),由此可見(jiàn),在 ServantProxyFactory::getServantProxy() 中有如下獲取 ObjectProxy 指針數(shù)組的代碼:

ObjectProxy ** ppObjectProxy = new ObjectProxy * [_comm- getClientThreadNum()];
assert(ppObjectProxy != NULL);
for(size_t i = 0; i   _comm- getClientThreadNum(); ++i)
 ppObjectProxy[i] = _comm- getCommunicatorEpoll(i)- getObjectProxy(name, setName);
}

4. 獲取 ObjectProxy

代碼來(lái)到 ObjectProxyFactory::getObjectProxy(),同樣,會(huì)首先加鎖,再?gòu)?map string,ObjectProxy* _objectProxys 中查找是否已經(jīng)擁有目標(biāo) ObjectProxy,若查找成功直接返回。若查找失敗,需要新建一個(gè)新的 ObjectProxy,通過(guò)類圖可知,ObjectProxy 需要一個(gè) CommunicatorEpoll 對(duì)象進(jìn)行初始化,由此關(guān)聯(lián)管理自己的 CommunicatorEpoll,CommunicatorEpoll 之后便可以通過(guò) getObjectProxy() 接口獲取屬于自己的 ObjectProxy。詳細(xì)過(guò)程可見(jiàn)下圖:

5. 建立 ObjectProxy 與 AdapterProxy 的關(guān)系

新建 ObjectProxy 的過(guò)程同樣非常值得關(guān)注,在 ObjectProxy::ObjectProxy() 中,關(guān)鍵代碼是:

_endpointManger.reset(new EndpointManager(this, _communicatorEpoll- getCommunicator(), sObjectProxyName, pCommunicatorEpoll- isFirstNetThread(), setName));

每個(gè) ObjectProxy 都有屬于自己的 EndpointManager 負(fù)責(zé)管理到服務(wù)端的所有 socket 連接 AdapterProxy,每個(gè) AdapterProxy 連接到一個(gè)提供相應(yīng)服務(wù)的服務(wù)端物理機(jī) socket 上。通過(guò) EndpointManager 還可以以不同的負(fù)載均衡方式獲取與服務(wù)器的 socket 連接 AdapterProxy。ObjectProxy:: ObjectProxy() 是圖(1-6)或者圖(1-8)中的略 1,具體的代碼流程如圖(1-9)所示。ObjectProxy 創(chuàng)建一個(gè) EndpointManager 對(duì)象,在 EndpointManager 的初始化過(guò)程中,依據(jù)客戶端提供的信息,直接創(chuàng)建連接到服務(wù)端物理機(jī)的 TCP/UDP 連接 AdapterProxy 或者從代理中獲取服務(wù)端物理機(jī) socket 列表后再創(chuàng)建 TCP/UDP 連接 AdapterProxy。

按照?qǐng)D(1-9)的程序流程執(zhí)行完成后,便會(huì)建立如圖(2-3)所示的一個(gè) ObjectProxy 對(duì)多個(gè) AdapterProxy 的關(guān)系。新建 ObjectProxy 之后,就可以調(diào)用其 ObjectProxy::initialize() 函數(shù)進(jìn)行 ObjectProxy 對(duì)象的初始化了,當(dāng)然,需要將 ObjectProxy 對(duì)象插入 ObjectProxyFactory 的成員變量_objectProxys 與_vObjectProxys 中,方便下次直接返回 ObjectProxy 對(duì)象。

6. 繼續(xù)完成 ServantProxy 的創(chuàng)建

退出層層的函數(shù)調(diào)用棧,代碼再次回 ServantProxyFactory::getServantProxy(),此時(shí),ServantProxyFactory 已經(jīng)獲得相應(yīng)的 ObjectProxy 數(shù)組 ObjectProxy** ppObjectProxy,接著便可以調(diào)用:

ServantPrx sp = new ServantProxy(_comm, ppObjectProxy, _comm- getClientThreadNum());

進(jìn)行 ServantProxy 的構(gòu)造。構(gòu)造完成便可以呈現(xiàn)出如圖(2-1)的關(guān)系。在 ServantProxy 的構(gòu)造函數(shù)中可以看到,ServantProxy 在新建一個(gè) EndpointManagerThread 變量,這是對(duì)外獲取路由請(qǐng)求的類,是 TARS 為調(diào)用邏輯而提供的多種解決跨地區(qū)調(diào)用等問(wèn)題的方案。同時(shí)可以看到:

for(size_t i = 0;i   _objectProxyNum; ++i)
 (*(_objectProxy + i))- setServantProxy(this);
}

建立了 ServantProxy 與 ObjectProxy 的相互關(guān)聯(lián)關(guān)系。剩下的是讀取配置文件,獲取相應(yīng)的信息。構(gòu)造 ServantProxy 變量完成后,ServantProxyFactory::getServantProxy() 獲取一些超時(shí)參數(shù),賦值給 ServantProxy 變量,同時(shí)將其放進(jìn) map string, ServantPrx _servantProxy 中,方便下次直接查找獲取。ServantProxyFactory::getServantProxy() 將剛剛構(gòu)造的 ServantProxy 指針變量返回給調(diào)用他的 Communicator::getServantProxy(),在 Communicator::getServantProxy() 中:

ServantProxy * Communicator::getServantProxy(const string  objectName,const string  setName)
 ……
 return _servantProxyFactory- getServantProxy(objectName,setName);
}

直接將返回值返回給調(diào)用起 Communicator::getServantProxy() 的 Communicator::stringToProxy()。可以看到:

template class T  void stringToProxy(const string  objectName, T  proxy,const string  setName=)
 ServantProxy * pServantProxy = getServantProxy(objectName,setName);
 proxy = (typename T::element_type*)(pServantProxy);
}

Communicator::stringToProxy() 將返回值強(qiáng)制轉(zhuǎn)換為客戶端代碼中與 HelloPrx prx 同樣的類型 HelloPrx。由于函數(shù)參數(shù) proxy 就是 prx 的引用。那么實(shí)際就是將句柄 prx 成功初始化了,用戶可以利用句柄 prx 進(jìn)行 RPC 調(diào)用了。

同步調(diào)用

當(dāng)我們獲得一個(gè) ServantProxy 句柄后,便可以進(jìn)行 RPC 調(diào)用了。Tars 提供四種 RPC 調(diào)用方式,分別是同步調(diào)用,異步調(diào)用,promise 調(diào)用與協(xié)程調(diào)用。其中最簡(jiǎn)單最常見(jiàn)的 RPC 調(diào)用方式是同步調(diào)用,接下來(lái),將簡(jiǎn)單分析 Tars 的同步調(diào)用。

現(xiàn)假設(shè)有一個(gè) MyDemo.StringServer.StringServantObj 的服務(wù),提供一個(gè) RPC 接口是 append,傳入兩個(gè) string 類型的變量,返回兩個(gè) string 類型變量的拼接結(jié)果。而且假設(shè)有兩臺(tái)服務(wù)器,socket 標(biāo)識(shí)分別是 192.168.106.129:34132 與 192.168.106.130:34132,設(shè)置客戶端的網(wǎng)絡(luò)線程數(shù)為 3,那么執(zhí)行如下代碼:

Communicator _comm;
StringServantPrx _proxy;
_comm.stringToProxy(MyDemo.StringServer.StringServantObj@tcp -h 192.168.106.129 -p 34132 , _proxy);
_comm.stringToProxy(MyDemo.StringServer.StringServantObj@tcp -h 192.168.106.130 -p 34132 , _proxy);

經(jīng)過(guò)上文關(guān)于客戶端初始化的分析介紹可知,可以得出如下圖所示的關(guān)系圖:

獲取 StringServantPrx _proxy 后,直接調(diào)用:

string str1(abc-), str2(defg), rStr;
int retCode = _proxy- append(str1, str2, rStr);

成功進(jìn)行 RPC 同步調(diào)用后,返回的結(jié)果是 rStr =“abc-defg”。

同樣,我們先看看與同步調(diào)用相關(guān)的類圖,如下圖所示:

StringServantProxy 是繼承自 ServantProxy 的,StringServantProxy 提供了 RPC 同步調(diào)用的接口 Int32 append(),當(dāng)用戶發(fā)起同步調(diào)用_proxy- append(str1, str2, rStr) 時(shí),所進(jìn)行的函數(shù)調(diào)用過(guò)程如下圖所示。

在函數(shù) StringServantProxy::append() 中,程序會(huì)先構(gòu)造 ServantProxy::tars_invoke() 所需要的參數(shù),如請(qǐng)求包類型,RPC 方法名,方法參數(shù)等,需要值得注意的是,傳遞參數(shù)中有一個(gè) ResponsePacket 類型的變量,在同步調(diào)用中,最終的返回結(jié)果會(huì)放置在這個(gè)變量上。接下來(lái)便直接調(diào)用了 ServantProxy::tars_invoke() 方法:

tars_invoke(tars::TARSNORMAL,  append , _os.getByteBuffer(), context, _mStatus, rep);

在 ServantProxy::tars_invoke() 方法中,先創(chuàng)建一個(gè) ReqMessage 變量 msg,初始化 msg 變量,給變量賦值,如 Tars 版本號(hào),數(shù)據(jù)包類型,服務(wù)名,RPC 方法名,Tars 的上下文容器,同步調(diào)用的超時(shí)時(shí)間(單位為毫秒)等。最后,調(diào)用 ServantProxy::invoke() 進(jìn)行遠(yuǎn)程方法調(diào)用。

無(wú)論同步調(diào)用還是各種異步調(diào)用,ServantProxy::invoke() 都是 RPC 調(diào)用的必經(jīng)之地。在 ServantProxy::invoke() 中,繼續(xù)填充傳遞進(jìn)來(lái)的變量 ReqMessage msg。此外,還需要獲取調(diào)用者 caller 線程的線程私有數(shù)據(jù) ServantProxyThreadData,用來(lái)指導(dǎo) RPC 調(diào)用。客戶端的每個(gè) caller 線程都有屬于自己的維護(hù)調(diào)用上下文的線程私有數(shù)據(jù),如 hash 屬性,消息染色信息。最關(guān)鍵的還是每條 caller 線程與每條客戶端網(wǎng)絡(luò)線程 CommunicatorEpoll 進(jìn)行信息交互的橋梁——通信隊(duì)列 ReqInfoQueue 數(shù)組,數(shù)組中的每個(gè) ReqInfoQueue 元素負(fù)責(zé)與一條網(wǎng)絡(luò)線程進(jìn)行交互,如圖(1-13)所示,圖中橙色陰影代表數(shù)組 ReqInfoQueue[],陰影內(nèi)的圓柱體代表數(shù)組元素 ReqInfoQueue。假如客戶端 create 兩條線程(下稱 caller 線程)發(fā)起 StringServant 服務(wù)的 RPC 請(qǐng)求,且客戶端網(wǎng)絡(luò)線程數(shù)設(shè)置為 2,那么兩條 caller 線程各自有屬于自己的線程私有數(shù)據(jù)請(qǐng)求隊(duì)列數(shù)組 ReqInfoQueue[],數(shù)組里面的 ReqInfoQueue 元素便是該數(shù)組對(duì)應(yīng)的 caller 線程與兩條網(wǎng)絡(luò)線程的通信橋梁,一條網(wǎng)絡(luò)線程對(duì)應(yīng)著數(shù)組里面的一個(gè)元素,通過(guò)網(wǎng)絡(luò)線程 ID 進(jìn)行數(shù)組索引。整個(gè)關(guān)系有點(diǎn)像生產(chǎn)者消費(fèi)者模型,生產(chǎn)者 Caller 線程向自己的線程私有數(shù)據(jù) **ReqInfoQueue[]** 中的第 N 個(gè)元素 ReqInfoQueue[N] push 請(qǐng)求包,消費(fèi)者客戶端第 N 個(gè)網(wǎng)絡(luò)線程就會(huì)從這個(gè)隊(duì)列中 pop 請(qǐng)求包。

閱讀代碼可能會(huì)發(fā)現(xiàn)幾個(gè)常量值,如 MAX_CLIENT_THREAD_NUM=64,這是最大網(wǎng)絡(luò)線程數(shù),在圖(1-13)中為單個(gè)請(qǐng)求隊(duì)列數(shù)組 ReqInfoQueue[] 的最大 size;MAX_CLIENT_NOTIFYEVENT_NUM=2048,在圖(1-13)中,可以看作 caller 線程的最大數(shù)量,或者請(qǐng)求隊(duì)列數(shù)組 ReqInfoQueue[] 的最大數(shù)量(反正兩者一一對(duì)應(yīng),每個(gè) caller 線程都有自己的線程私有數(shù)據(jù) ReqInfoQueue[])。

接著依據(jù) caller 線程的線程私有數(shù)據(jù)進(jìn)行第一次的負(fù)載均衡——選取 ObjectProxy(即選擇網(wǎng)絡(luò)線程 CommunicatorEpoll)和與之相對(duì)應(yīng)的 ReqInfoQueue:

ObjectProxy * pObjProxy = NULL;
ReqInfoQueue * pReqQ = NULL;
// 選擇網(wǎng)絡(luò)線程
selectNetThreadInfo(pSptd, pObjProxy, pReqQ);

在 ServantProxy::selectNetThreadInfo() 中,通過(guò)輪詢的形式來(lái)選取 ObjectProxy 與 ReqInfoQueue。

退出 ServantProxy::selectNetThreadInfo() 后,便得到 ObjectProxy_類型的 pObjProxy 及其對(duì)應(yīng)的 ReqInfoQueue_類型的 ReqInfoQueue,稍后通過(guò) pObjectProxy 來(lái)發(fā)送 RPC 請(qǐng)求,請(qǐng)求信息會(huì)暫存在 ReqInfoQueue 中。

由于是同步調(diào)用,需要新建一個(gè)條件變量去監(jiān)聽(tīng) RPC 的完成,可見(jiàn):

// 同步調(diào)用  new  一個(gè) ReqMonitor
assert(msg- pMonitor == NULL);
if(msg- eType == ReqMessage::SYNC_CALL)
 msg- bMonitorFin = false;
 if(pSptd- _sched)
  msg- bCoroFlag = true;
  msg- sched  = pSptd- _sched;
  msg- iCoroId = pSptd- _sched- getCoroutineId();
 else
  msg- pMonitor = new ReqMonitor;
}

創(chuàng)建完條件變量,接下來(lái)往 ReqInfoQueue 中 push_back() 請(qǐng)求信息包 msg,并通知 pObjProxy 所屬的 CommunicatorEpoll 進(jìn)行數(shù)據(jù)發(fā)送:

if(!pReqQ- push_back(msg,bEmpty))
 TLOGERROR([TARS][ServantProxy::invoke msgQueue push_back error num:    pSptd- _netSeq   ]    endl);
 delete msg;
 msg = NULL;
 pObjProxy- getCommunicatorEpoll()- notify(pSptd- _reqQNo, pReqQ);
 throw TarsClientQueueException( client queue full 
 
pObjProxy- getCommunicatorEpoll()- notify(pSptd- _reqQNo, pReqQ);

來(lái)到 CommunicatorEpoll::notify() 中,往請(qǐng)求事件通知數(shù)組 NotifyInfo _notify[] 中添加請(qǐng)求事件,通知 CommunicatorEpoll 進(jìn)行請(qǐng)求包的發(fā)送。注意了,這個(gè)函數(shù)的作用僅僅是通知網(wǎng)絡(luò)線程準(zhǔn)備發(fā)送數(shù)據(jù),通過(guò) TC_Epoller::mod() 或者 TC_Epoller::add() 觸發(fā)一個(gè) EPOLLIN 事件,從而促使阻塞在 TC_Epoller::wait()(在 CommunicatorEpoll::run() 中阻塞)的網(wǎng)絡(luò)線程 CommunicatorEpoll 被喚醒,并設(shè)置喚醒后的 epoll_event 中的聯(lián)合體 epoll_data 變量為 _notify[iSeq].stFDInfo:

void CommunicatorEpoll::notify(size_t iSeq,ReqInfoQueue * msgQueue)
 assert(iSeq   MAX_CLIENT_NOTIFYEVENT_NUM);
 
 if(_notify[iSeq].bValid)
  _ep.mod(_notify[iSeq].notify.getfd(),(long long) _notify[iSeq].stFDInfo, EPOLLIN);
  assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue);
 else
  _notify[iSeq].stFDInfo.iType = FDInfo::ET_C_NOTIFY;
  _notify[iSeq].stFDInfo.p  = (void*)msgQueue;
  _notify[iSeq].stFDInfo.fd  = _notify[iSeq].eventFd;
  _notify[iSeq].stFDInfo.iSeq = iSeq;
  _notify[iSeq].notify.createSocket();
  _notify[iSeq].bValid  = true;
 
  _ep.add(_notify[iSeq].notify.getfd(),(long long) _notify[iSeq].stFDInfo, EPOLLIN);
}

就是經(jīng)過(guò)這么一個(gè)操作,網(wǎng)絡(luò)線程就可以被喚醒,喚醒后通過(guò) epoll_event 變量可獲得 _notify[iSeq].stFDInfo。接下來(lái)的請(qǐng)求發(fā)送與響應(yīng)的接收會(huì)在后面會(huì)詳細(xì)介紹。

隨后,代碼再次回到 ServantProxy::invoke(),阻塞于:

if(!msg- bMonitorFin)
 TC_ThreadLock::Lock lock(*(msg- pMonitor));
 // 等待直到網(wǎng)絡(luò)線程通知過(guò)來(lái)
 if(!msg- bMonitorFin)
 {  msg- pMonitor- wait();
}

等待網(wǎng)絡(luò)線程接收到數(shù)據(jù)后,對(duì)其進(jìn)行喚醒。接收到響應(yīng)后,檢查是否成功獲取響應(yīng),是則直接退出函數(shù)即可,響應(yīng)信息在傳入的參數(shù) msg 中:

if(msg- eStatus == ReqMessage::REQ_RSP   msg- response.iRet == TARSSERVERSUCCESS)
 snprintf(pSptd- _szHost, sizeof(pSptd- _szHost),  %s , msg- adapter- endpoint().desc().c_str());
 // 成功
 return;
}

若接收失敗,會(huì)拋出異常,并刪除 msg:

TarsException::throwException(ret, os.str());

若接收成功,退出 ServantProxy::invoke() 后,回到 ServantProxy::tars_invoke(),獲取 ResponsePacket 類型的響應(yīng)信息,并刪除 msg 包:

rsp = msg- response;
delete msg;
msg = NULL;

代碼回到 StringServantProxy::append(),此時(shí)經(jīng)過(guò)同步調(diào)用,可以直接獲取 RPC 返回值并回到客戶端中。

網(wǎng)絡(luò)線程發(fā)送請(qǐng)求

上面提到,當(dāng)在 ServantProxy::invoke() 中,調(diào)用 CommunicatorEpoll::notify() 通知網(wǎng)絡(luò)線程進(jìn)行請(qǐng)求發(fā)送后,接下來(lái),網(wǎng)絡(luò)線程的具體執(zhí)行流程如下圖所示:

由于 CommunicatorEpoll 繼承自 TC_Thread,在上文 1.2.2 節(jié)中的第 2 小點(diǎn)的初始化 CommunicatorEpoll 之后便調(diào)用其 CommunicatorEpoll::start() 函數(shù)啟動(dòng)網(wǎng)絡(luò)線程,網(wǎng)絡(luò)線程在 CommunicatorEpoll::run() 中一直等待_ep.wait(iTimeout)。由于在上一節(jié)的描述中,在 CommunicatorEpoll::notify(),caller 線程發(fā)起了通知 notify,網(wǎng)絡(luò)線程在 CommunicatorEpoll::run() 就會(huì)調(diào)用 CommunicatorEpoll::handle() 處理通知:

void CommunicatorEpoll::run()
 ......
  try
  {  int iTimeout = ((_waitTimeout   _timeoutCheckInterval) ? _waitTimeout : _timeoutCheckInterval);
 
  int num = _ep.wait(iTimeout);
 
  if(_terminate)
  {
  break;
  }
 
  // 先處理 epoll 的網(wǎng)絡(luò)事件
  for (int i = 0; i   num; ++i)
  {  // 獲取 epoll_event 變量的 data,就是 1.3.1 節(jié)中提過(guò)的 _notify[iSeq].stFDInfo
  const epoll_event  ev = _ep.get(i);
  uint64_t data = ev.data.u64;
 
  if(data == 0)
  {
  continue; //data 非指針,  退出循環(huán)
  }
  handle((FDInfo*)data, ev.events);
  }
  }
 ......
 
}

在 CommunicatorEpoll::handle() 中,通過(guò)傳遞進(jìn)來(lái)的 epoll_event 中的 data 成員變量獲取前面被選中的 ObjectProxy 并調(diào)用其 ObjectProxy::invoke() 函數(shù):

void CommunicatorEpoll::handle(FDInfo * pFDInfo, uint32_t events)
  assert(pFDInfo != NULL);
 
  // 隊(duì)列有消息通知過(guò)來(lái)
  if(FDInfo::ET_C_NOTIFY == pFDInfo- iType)
  {  ReqInfoQueue * pInfoQueue=(ReqInfoQueue*)pFDInfo- 
  ReqMessage * msg = NULL;
 
  try
  {  while(pInfoQueue- pop_front(msg))
  {
  ......
 
  try
  {  msg- pObjectProxy- invoke(msg);
  }
  ......
  }
  }
  ......
  }
  ......
}

在 ObjectProxy::invoke() 中將進(jìn)行第二次的負(fù)載均衡,像圖(1-4)所示,每個(gè) ObjectProxy 通過(guò) EndpointManager 可以以不同的負(fù)載均衡方式對(duì) AdapterProxy 進(jìn)行選取選擇:

void ObjectProxy::invoke(ReqMessage * msg)
 ......
 // 選擇一個(gè)遠(yuǎn)程服務(wù)的 Adapter 來(lái)調(diào)用
 AdapterProxy * pAdapterProxy = NULL;
 bool bFirst = _endpointManger- selectAdapterProxy(msg, pAdapterProxy);
 ......
}

在 EndpointManager:: selectAdapterProxy() 中,有多種負(fù)載均衡的方式來(lái)選取 AdapterProxy,如 getHashProxy(),getWeightedProxy(),getNextValidProxy() 等。

獲取 AdapterProxy 之后,便將選擇到的 AdapterProxy 賦值給 EndpointManager:: selectAdapterProxy() 函數(shù)中的引用參數(shù) pAdapterProxy,隨后執(zhí)行:

void ObjectProxy::invoke(ReqMessage * msg)
 ......
 msg- adapter = pAdapterProxy;
 pAdapterProxy- invoke(msg);
}

調(diào)用 pAdapterProxy 將請(qǐng)求信息發(fā)送出去。而在 AdapterProxy::invoke() 中,AdapterProxy 將調(diào)用 Transceiver::sendRequset() 進(jìn)行請(qǐng)求的發(fā)送。至此,對(duì)應(yīng)同步調(diào)用的網(wǎng)絡(luò)線程發(fā)送請(qǐng)求的工作就結(jié)束了,網(wǎng)絡(luò)線程會(huì)回到 CommunicatorEpoll::run() 中,繼續(xù)等待數(shù)據(jù)的收發(fā)。

網(wǎng)絡(luò)線程接收響應(yīng)

當(dāng)網(wǎng)絡(luò)線程 CommunicatorEpoll 接收到響應(yīng)數(shù)據(jù)之后,如同之前發(fā)送請(qǐng)求那樣,在 CommunicatorEpoll::run() 中,程序獲取活躍的 epoll_event 的變量,并將其中的 epoll_data_t data 傳遞給 CommunicatorEpoll::handle():

// 先處理 epoll 的網(wǎng)絡(luò)事件
for (int i = 0; i   num; ++i)
 const epoll_event  ev = _ep.get(i);
 uint64_t data = ev.data.u64;
 
 if(data == 0)
 {
  continue; //data 非指針,  退出循環(huán)
 handle((FDInfo*)data, ev.events);
}

接下來(lái)的程序流程如下圖所示:

在 CommunicatorEpoll::handle() 中,從 epoll_data::data 中獲取 Transceiver 指針,并調(diào)用 CommunicatorEpoll::handleInputImp():

Transceiver *pTransceiver = (Transceiver*)pFDInfo- 
// 先收包
if (events   EPOLLIN)
handleInputImp(pTransceiver);
 catch(exception   e)
TLOGERROR([TARS]CommunicatorEpoll::handle exp: e.what()  ,line: __LINE__ endl);
 catch(...)
TLOGERROR([TARS]CommunicatorEpoll::handle| __LINE__ endl);
}

在 CommunicatorEpoll::handleInputImp() 中,除了對(duì)連接的判斷外,主要做兩件事,調(diào)用 Transceiver::doResponse() 以及 AdapterProxy::finishInvoke(ResponsePacket),前者的工作是從 socket 連接中獲取響應(yīng)數(shù)據(jù)并判斷接收的數(shù)據(jù)是否為一個(gè)完整的 RPC 響應(yīng)包。后者的作用是將響應(yīng)結(jié)果返回給客戶端,同步調(diào)用的會(huì)喚醒阻塞等待在條件變量中的 caller 線程,異步調(diào)用的會(huì)在異步回調(diào)處理線程中執(zhí)行回調(diào)函數(shù)。在 AdapterProxy::finishInvoke(ResponsePacket) 中,需要注意一點(diǎn),假如是同步調(diào)用的,需要獲取響應(yīng)包 rsp 對(duì)應(yīng)的 ReqMessage 信息,在 Tars 中,執(zhí)行:

ReqMessage * msg = NULL;
//  獲取響應(yīng)包 rsp 對(duì)應(yīng)的 msg 信息,并在超時(shí)隊(duì)列中剔除該 msg
bool retErase = _timeoutQueue- erase(rsp.iRequestId, msg);

在找回響應(yīng)包對(duì)應(yīng)的請(qǐng)求信息 msg 的同時(shí),將其在超時(shí)隊(duì)列中剔除出來(lái)。接著執(zhí)行:

msg- eStatus = ReqMessage::REQ_RSP;
msg- response = rsp;
finishInvoke(msg);

程序調(diào)用另一個(gè)重載函數(shù) AdapterProxy::finishInvoke(ReqMessage*),在 AdapterProxy::finishInvoke(ReqMessage*) 中,不同的 RPC 調(diào)用方式會(huì)執(zhí)行不同的動(dòng)作,例如同步調(diào)用會(huì)喚醒對(duì)應(yīng)的 caller 線程:

// 同步調(diào)用,喚醒 ServantProxy 線程
if(msg- eType == ReqMessage::SYNC_CALL)
 if(!msg- bCoroFlag)
  assert(msg- pMonitor);
 
  TC_ThreadLock::Lock sync(*(msg- pMonitor));
  msg- pMonitor- notify();
  msg- bMonitorFin = true;
 else
  msg- sched- put(msg- iCoroId);
 
 return ;
}

至此,對(duì)應(yīng)同步調(diào)用的網(wǎng)絡(luò)線程接收響應(yīng)的工作就結(jié)束了,網(wǎng)絡(luò)線程會(huì)回到 CommunicatorEpoll::run() 中,繼續(xù)等待數(shù)據(jù)的收發(fā)。綜上,客戶端同步調(diào)用的過(guò)程如下圖所示。

異步調(diào)用

在 Tars 中,除了最常見(jiàn)的同步調(diào)用之外,還可以進(jìn)行異步調(diào)用,異步調(diào)用可分三種:普通的異步調(diào)用,promise 異步調(diào)用與協(xié)程異步調(diào)用,這里簡(jiǎn)單介紹普通的異步調(diào)用,看看其與上文介紹的同步調(diào)用有何異同。

異步調(diào)用不會(huì)阻塞整個(gè)客戶端程序,調(diào)用完成(請(qǐng)求發(fā)送)之后,用戶可以繼續(xù)處理其他事情,等接收到響應(yīng)之后,Tars 會(huì)在異步處理線程當(dāng)中執(zhí)行用戶實(shí)現(xiàn)好的回調(diào)函數(shù)。在這里,會(huì)用到《Effective C++》中條款 35 所介紹的“藉由 Non-Virtual Interface 手法實(shí)現(xiàn) Template Method 模式”,用戶需要繼承一個(gè) XXXServantPrxCallback 基類,并實(shí)現(xiàn)里面的虛函數(shù),異步回調(diào)線程會(huì)在收到響應(yīng)包之后回調(diào)這些虛函數(shù),具體的異步調(diào)用客戶端示例這里不作詳細(xì)介紹,在 Tars 的 Example 中會(huì)找到相應(yīng)的示例代碼。

初始化

本文第一章已經(jīng)詳細(xì)介紹了客戶端的初始化,這里再簡(jiǎn)單提一下,在第一章的“1.2.2 初始化代碼跟蹤 - 2. 執(zhí)行 Communicator 的初始化函數(shù)”中,已經(jīng)提到說(shuō),在每一個(gè)網(wǎng)絡(luò)線程 CommunicatorEpoll 的初始化過(guò)程中,會(huì)創(chuàng)建_asyncThreadNum 條異步線程,等待異步調(diào)用的時(shí)候處理響應(yīng)數(shù)據(jù):

CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq)
 ......
 // 異步線程數(shù)
 _asyncThreadNum = TC_Common::strto size_t (pCommunicator- getProperty( asyncthread ,  3));
 
 if(_asyncThreadNum == 0)
  _asyncThreadNum = 3;
 
 if(_asyncThreadNum   MAX_CLIENT_ASYNCTHREAD_NUM)
  _asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;
 ......
 // 異步隊(duì)列的大小
 size_t iAsyncQueueCap = TC_Common::strto size_t (pCommunicator- getProperty( asyncqueuecap ,  10000));
 if(iAsyncQueueCap   10000)
  iAsyncQueueCap = 10000;
 ......
 // 創(chuàng)建異步線程
 for(size_t i = 0; i   _asyncThreadNum; ++i)
  _asyncThread[i] = new AsyncProcThread(iAsyncQueueCap);
  _asyncThread[i]- start();
 ......
}

在開(kāi)始講述異步調(diào)用與接收響應(yīng)之前,先看看大致的調(diào)用過(guò)程,與圖(1-16)的同步調(diào)用來(lái)個(gè)對(duì)比。

跟同步調(diào)用的示例一樣,現(xiàn)在有一 MyDemo.StringServer.StringServantObj 的服務(wù),提供一個(gè) RPC 接口是 append,傳入兩個(gè) string 類型的變量,返回兩個(gè) string 類型變量的拼接結(jié)果。在執(zhí)行 tars2cpp 而生成的文件中,定義了回調(diào)函數(shù)基類 StringServantPrxCallback,用戶需要 public 繼承這個(gè)基類并實(shí)現(xiàn)自己的方法,例如:

class asyncClientCallback : public StringServantPrxCallback {
public:
 void callback_append(Int32 ret, const string  rStr) {
 cout    append: async callback success and retCode is     ret     ,rStr is     rStr    \n 
 }
 void callback_append_exception(Int32 ret) {
 cout    append: async callback fail and retCode is     ret    \n 
 }
};

然后,用戶就可以通過(guò) proxy- async_append(new asyncClientCallback(), str1, str2) 進(jìn)行異步調(diào)用了,調(diào)用過(guò)程與上文的同步調(diào)用差不多,函數(shù)調(diào)用流程如下圖所示,可以與圖(1-12)進(jìn)行比較,看看同步調(diào)用與異步調(diào)用的異同。

在異步調(diào)用中,客戶端發(fā)起異步調(diào)用_proxy- async_append(new asyncClientCallback(), str1, str2) 后,在函數(shù) StringServantProxy::async_append() 中,程序同樣會(huì)先構(gòu)造 ServantProxy::tars_invoke_async() 所需要的參數(shù),如請(qǐng)求包類型,RPC 方法名,方法參數(shù)等,與同步調(diào)用的一個(gè)區(qū)別是,還傳遞了承載回調(diào)函數(shù)的派生類實(shí)例。接下來(lái)便直接調(diào)用了 ServantProxy::tars_invoke_async() 方法:

tars_invoke_async(tars::TARSNORMAL, append , _os.getByteBuffer(), context, _mStatus, callback)

在 ServantProxy::tars_invoke_async() 方法中,先創(chuàng)建一個(gè) ReqMessage 變量 msg,初始化 msg 變量,給變量賦值,如 Tars 版本號(hào),數(shù)據(jù)包類型,服務(wù)名,RPC 方法名,Tars 的上下文容器,異步調(diào)用的超時(shí)時(shí)間(單位為毫秒)以及異步調(diào)用后的回調(diào)函數(shù) ServantProxyCallbackPtr callback(等待異步調(diào)用返回響應(yīng)后回調(diào)里面的函數(shù))等。最后,與同步調(diào)用一樣,調(diào)用 ServantProxy::invoke() 進(jìn)行遠(yuǎn)程方法調(diào)用。

在 ServantProxy::invoke() 中,繼續(xù)填充傳遞進(jìn)來(lái)的變量 ReqMessage msg。此外,還需要獲取調(diào)用者 caller 線程的線程私有數(shù)據(jù) ServantProxyThreadData,用來(lái)指導(dǎo) RPC 調(diào)用。與同步調(diào)用一樣,利用 ServantProxy::selectNetThreadInfo() 來(lái)輪詢選取 ObjectProxy(網(wǎng)絡(luò)線程 CommunicatorEpoll)與對(duì)應(yīng)的 ReqInfoQueue,詳細(xì)可看同步調(diào)用中的介紹,注意區(qū)分客戶端中的調(diào)用者 caller 線程與網(wǎng)絡(luò)線程,以及之間的通信橋梁。

退出 ServantProxy::selectNetThreadInfo() 后,便得到 ObjectProxy_類型的 pObjProxy 及其對(duì)應(yīng)的 ReqInfoQueue_類型的 ReqInfoQueue,在異步調(diào)用中,不需要建立條件變量來(lái)阻塞進(jìn)程,直接通過(guò) pObjectProxy 來(lái)發(fā)送 RPC 請(qǐng)求,請(qǐng)求信息會(huì)暫存在 ReqInfoQueue 中:

if(!pReqQ- push_back(msg,bEmpty))
 TLOGERROR([TARS][ServantProxy::invoke msgQueue push_back error num:    pSptd- _netSeq   ]    endl);
 
 delete msg;
 msg = NULL;
 
 pObjProxy- getCommunicatorEpoll()- notify(pSptd- _reqQNo, pReqQ);
 
 throw TarsClientQueueException( client queue full 
 
pObjProxy- getCommunicatorEpoll()- notify(pSptd- _reqQNo, pReqQ);

在之后,就不需要做任何的工作,退出層層函數(shù)調(diào)用,回到客戶端中,程序可以繼續(xù)執(zhí)行其他動(dòng)作。

接收響應(yīng)與函數(shù)回調(diào)

異步調(diào)用的請(qǐng)求發(fā)送過(guò)程與同步調(diào)用的一致,都是在網(wǎng)絡(luò)線程中通過(guò) ObjectProxy 去調(diào)用 AdapterProxy 來(lái)發(fā)送數(shù)據(jù)。但是在接收到響應(yīng)之后,通過(guò)圖(1-15)可以看到,在函數(shù) AdapterProxy::finishInvoke(ReqMessage*) 中,同步調(diào)用會(huì)通過(guò) msg- pMonitor- notify() 喚醒客戶端的 caller 線程來(lái)接收響應(yīng)包,而在異步調(diào)用中,則是如圖(1-19)所示,CommunicatorEpoll 與 AsyncProcThread 的關(guān)系如圖(1-20)所示。

在函數(shù) AdapterProxy::finishInvoke(ReqMessage*) 中,程序通過(guò):

// 異步回調(diào),放入回調(diào)處理線程中
_objectProxy- getCommunicatorEpoll()- pushAsyncThreadQueue(msg);

將信息包 msg(帶響應(yīng)信息)放到異步回調(diào)處理線程中,在 CommunicatorEpoll::pushAsyncThreadQueue() 中,通過(guò)輪詢的方式選擇異步回調(diào)處理線程處理接收到的響應(yīng)包,異步處理線程數(shù)默認(rèn)是 3,最大是 1024。

void CommunicatorEpoll::pushAsyncThreadQueue(ReqMessage * msg)
 // 先不考慮每個(gè)線程隊(duì)列數(shù)目不一致的情況
 _asyncThread[_asyncSeq]- push_back(msg);
 _asyncSeq ++;
 
 if(_asyncSeq == _asyncThreadNum)
  _asyncSeq = 0;
}

選取之后,通過(guò) AsyncProcThread::push_back(),將 msg 包放在響應(yīng)包隊(duì)列 AsyncProcThread::_msgQueue 中,然后通過(guò) AsyncProcThread:: notify() 函數(shù)通知本異步回調(diào)處理線程進(jìn)行處理,AsyncProcThread:: notify() 函數(shù)可以令阻塞在 AsyncProcThread:: run() 中的 AsyncProcThread::timedWait() 的異步處理線程被喚醒。

在 AsyncProcThread::run() 中,主要執(zhí)行下面的程序進(jìn)行函數(shù)回調(diào):

if (_msgQueue- pop_front(msg))
 ......
 
  ReqMessagePtr msgPtr = msg;
  msg- callback- onDispatch(msgPtr);
 catch (exception  e)
  TLOGERROR([TARS][AsyncProcThread exception]:    e.what()   endl);
 catch (...)
  TLOGERROR([TARS][AsyncProcThread exception.]    endl);
}

通過(guò) msg- callback,程序可以調(diào)用回調(diào)函數(shù)基類 StringServantPrxCallback 里面的 onDispatch() 函數(shù)。在 StringServantPrxCallback:: onDispatch() 中,分析此次響應(yīng)所對(duì)應(yīng)的 RPC 方法名,獲取響應(yīng)結(jié)果,并通過(guò)動(dòng)態(tài)多態(tài),執(zhí)行用戶所定義好的派生類的虛函數(shù)。通過(guò) ReqMessagePtr 的引用計(jì)數(shù),還可以將 ReqNessage* msg 刪除掉,與同步調(diào)用不同,同步調(diào)用的 msg 的新建與刪除都在 caller 線程中,而異步調(diào)用的 msg 在 caller 線程上構(gòu)造,在異步回調(diào)處理線程中析構(gòu)。

“TARS C++ 客戶端是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注丸趣 TV 網(wǎng)站,丸趣 TV 小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-25發(fā)表,共計(jì)22245字。
轉(zhuǎn)載說(shuō)明:除特殊說(shuō)明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請(qǐng)注明出處。
評(píng)論(沒(méi)有評(píng)論)
主站蜘蛛池模板: 绥中县| 陈巴尔虎旗| 屏边| 横山县| 蓝山县| 镇远县| 富川| 沾益县| 隆尧县| 宣汉县| 象州县| 江门市| 白城市| 西乡县| 滨海县| 肃南| 古田县| 德江县| 安义县| 铅山县| 桑日县| 阿城市| 淅川县| 北碚区| 石林| 龙里县| 石柱| 咸阳市| 泾源县| 民乐县| 内丘县| 芦山县| 永丰县| 乡宁县| 肇源县| 蒙自县| 平陆县| 金华市| 镇原县| 尚志市| 鹤山市|