共計 6027 個字符,預計需要花費 16 分鐘才能閱讀完成。
本篇文章給大家分享的是有關如何進行 Spark 底層通信 RPC 源碼分析,丸趣 TV 小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著丸趣 TV 小編一起來看看吧。
RPC 通信: 無論是 hadoop2.x 的 Rpc 通信方式還是 Spark2.x 的 Rpc 通信方式,簡單通俗的來說就是兩個進程之間的遠程通信,比如 java 一個 A 項目里面有一個 class A,里面有一個 washA 方法一個 B 項目里面有一個 Class B 類,里面有一個方法是 washB,B 項目通過代理模式以及 java 的反射機制調用到 A 項目里面的 washA, 這種情況下就可以理解為是一個簡單的 Rpc 通信方式。
Spark2.x
Spark2.x 使用基于 RPC 的通信方式,去除了 1.x 的 Akka 的實現方式,只保留了 netty 的實現方式,Spark2.x Rpc 提供了上層抽象(RpcEndpoint、RpcEnv、RpcEndPointRef),具體的實現方式只要實現了定義的抽象就可以完成 Rpc 通信,Spark2.x 之后目前版本只保留了 Netty(NettyRpcEnv、NettyRpcEndpointRef)的實現,定義抽象最大的好處相信開發的朋友都很清楚,以后不管提供了什么方式的實現只要實現了 RPCEndpoint,RpcEnv,RpcEndpointRef 就可以完成的通信功能。比如自己寫一個自己版本的 Rpc 通信實現。
Spark2.x 的 Rpc 通信方式主要包括一下幾個重要方面
RpcEndpoint: 消息通信體,主要是用來接收消息、處理消息,實現了 RpcEndPoint 接口就是一個消息通信體(Master、Work),RpcEndpoint 需要向 RpcEnv 注冊
RpcEnv:Rpc 通信的上下文環境,消息發送過來首先經過 RpcEnv 然后路由給對應的 RpcEndPoint,得到 RpcEndPoint
RpcEndPointRef:RpcEndPoint 的引用如果要想某個 RpcEndPoint 發送消息,首先要通過 RpcEnv 得到 RpcEndPoint 的引用
RpcEndPoint 接口 里面的定義如下
val rpcEnv : RpcEnv // 得到 RpcEnv 對象
final def self: RpcEndpointRef = {// 返回一個 RpcEnpointRef 這個方法通常用來自己給自己發送消息
rpcEnv.endpointRef(this)
}
def receive: PartialFunction[Any, Unit]// 處理 RpcEndPointRef.send 或者 RpcEndPointRef.reply 方法,該方法不需要進行響應信息
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]// 處理 RpcEndPointref.ask 發送的消息,處理完之后需要給調用 ask 的通信端響應消息(reply)
def onError(cause: Throwable)// 處理消息失敗的時候會調用此方法
def onConnected(remoteAddress: RpcAddress)// 遠程連接的當前節點的時候觸發
def onDisconnected(remoteAddress: RpcAddress)// 遠程連接斷開時候觸發
def onNetworkError(cause: Throwable, remoteAddress: RpcAddress)// 遠程連接發生網絡異常時觸發
def onStop()// 停止 RpcEndPoint
def onStart()// 啟動 RpcEndPoint,這里不僅僅是網絡上說的啟動 RpcEndPoint 處理任何消息,onStart 方法里面很多情況下可以寫自己的 RpcEndPoint 的一些實現比如啟動端口,或者創建目錄
但是 RpcEndPoint 只有在 onStart 方法做一些處理之后 才可以接受 RpcEndPointRef 發送的消息
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint// 因為 receive 是并發操作如果要現成安全就是用 threadSafeRpcEndPoint
RpcEndPoint 的生命周期 構造 – onStart– receive — onStop, 注意 onStart 的方法是在調用 setRpcEndPoint 注冊之后就會執行任何 RpcEndPoint 的 onStart 方法都是在注冊之后執行的
原因后面的源碼的提到
RpcEndpointRef: 抽象類
def address: RpcAddress // 根據主機名端口返回一個 RppAddress
def name: String//name 一個字符串 暫時不知道干嘛的
def send(message: Any): Unit// 向 RpcEndPoint 發送一個消息 不需要返回結果
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout) // 向 RpcEndPoint 發送消息并得到返回結果
def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)// 想 RpcEndPoint 發送消息并在一定時間內返回結果 失敗的時候并且進行一定次數的重試
RpcEnv
private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef// 傳入 RpcEndPoint 得到 RpcEndPointref 對象
def address: RpcAddress// 根據主機名端口返回一個 RppAddress
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef// 注冊 RpcEndPoint 返回對應的 RpcEndPointRef
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]// 通過 uri 一步獲取 RpcEndPointRef
def stop(endpoint: RpcEndpointRef): Unit// 停止 RpcEndPoint 根據 RpcEndPointRef
def shutdown(): Unit// 關閉 RpcEndPoint
def awaitTermination(): Unit// 等待 RpcEndPoint 退出
object RpcEnv
def create(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv = {
val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
new NettyRpcEnvFactory().create(config)
}
// 通過 RpcEnvFactory.create 創建 RpcEnv 環境
RpcEnvConfig
private[spark] case class RpcEnvConfig(
conf: SparkConf,
name: String,
host: String,
port: Int,
securityManager: SecurityManager,
clientMode: Boolean)
case 類 里面包括 SparkConf,name,host,port 等
NettyRpcEnv NettyRpcEnv 通過 NettyRpcEnvFactory 的 create 方法創建
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)// 創建 nettyEnv
private val dispatcher: Dispatcher = new Dispatcher(this)
Dispatcher 負責 RPC 消息的路由,它能夠將消息路由到對應的 RpcEndpoint 進行處理, 同時存放 RpcEndPoint 與 RpcEndPointRef 的映射
NettyStreamManager 負責提供文件服務(文件、JAR 文件、目錄)
TransportContext 負責管理網路傳輸上下文信息:創建 MessageEncoder、MessageDecoder、TransportClientFactory、TransportServer
NettyRpcHandler 負責處理網絡 IO 事件,接收 RPC 調用請求,并通過 Dispatcher 派發消息
這里說一下 Dispatcher 該類主要負責 Rpc 消息路由 里面有一個內部累 EndPointData 但是有一個現成安全的 Inbox 這里面存放的時候收到的消息,非常重要后面會做具體分析
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
}
private val endpoints = new ConcurrentHashMap[String, EndpointData]// 存放 name- 對應的 EndPoint 的信息
private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]// 存放 RpcEndpoint, RpcEndpointRef 的映射關系
private val receivers = new LinkedBlockingQueue[EndpointData]// 隊列下面會有一個現成不斷的從里面取出來處理
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException(RpcEnv has been stopped)
}
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s There is already an RpcEndpoint called $name)
}
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
receivers.offer(data) // for the OnStart message
}
endpointRef
}
// 注冊 RpcEndPoint 在這里面發生 同時將 data put 到 receivers
在 NettyRpcEndPoint 里面有一個 threadpool
private val threadpool: ThreadPoolExecutor = {
val numThreads = nettyEnv.conf.getInt(spark.rpc.netty.dispatcher.numThreads ,
math.max(2, Runtime.getRuntime.availableProcessors()))
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, dispatcher-event-loop)
for (i – 0 until numThreads) {
pool.execute(new MessageLoop)
}
pool
}
MessageLoop 是一個實現了 Runnable 的類,里面的 run 方法里面不斷從 receivers 取出來進行處理
重要代碼 data.inbox.process(Dispatcher.this)
這個里面有一個非常重要的點就是 什么時候調用 onStart 的方法因為 receivers 里面存放的是 EndPoint 的信息 同時創建 EndPointData 對象
進入 Inbox 里面看一下
inbox = // Give this an alias so we can use it more clearly in closures.
@GuardedBy(this)
protected val messages = new java.util.LinkedList[InboxMessage]()
inbox.synchronized {
messages.add(OnStart)
}
創建這個類的時候會有一個 messagelinkedList 的 list 集合 在創建這個結合之后 就會將 onStart 方法添加到里面,并且是現成安全的
然后 process 方法里面會不斷的拿到集合的數據來進行對應的操作
case OnStart =
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
這個時候就會調用 onStart 方法
這個時候相當于 RpcEndPoint 可以接受消息并且處理了
Spark Rpc 通信方式 分為本地消息和遠程消息,本地消息相當于調用的方法直接存放到 Index(中文收件箱),遠程消息需要走 NettyRpcHandler
以上就是如何進行 Spark 底層通信 RPC 源碼分析,丸趣 TV 小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注丸趣 TV 行業資訊頻道。