久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何進行Spark底層通信RPC源碼分析

160次閱讀
沒有評論

共計 6027 個字符,預計需要花費 16 分鐘才能閱讀完成。

本篇文章給大家分享的是有關如何進行 Spark 底層通信 RPC 源碼分析,丸趣 TV 小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著丸趣 TV 小編一起來看看吧。

RPC 通信: 無論是 hadoop2.x 的 Rpc 通信方式還是 Spark2.x 的 Rpc 通信方式,簡單通俗的來說就是兩個進程之間的遠程通信,比如 java 一個 A 項目里面有一個 class A,里面有一個 washA 方法一個 B 項目里面有一個 Class B 類,里面有一個方法是 washB,B 項目通過代理模式以及 java 的反射機制調用到 A 項目里面的 washA, 這種情況下就可以理解為是一個簡單的 Rpc 通信方式。

Spark2.x

Spark2.x 使用基于 RPC 的通信方式,去除了 1.x 的 Akka 的實現方式,只保留了 netty 的實現方式,Spark2.x Rpc 提供了上層抽象(RpcEndpoint、RpcEnv、RpcEndPointRef),具體的實現方式只要實現了定義的抽象就可以完成 Rpc 通信,Spark2.x 之后目前版本只保留了 Netty(NettyRpcEnv、NettyRpcEndpointRef)的實現,定義抽象最大的好處相信開發的朋友都很清楚,以后不管提供了什么方式的實現只要實現了 RPCEndpoint,RpcEnv,RpcEndpointRef 就可以完成的通信功能。比如自己寫一個自己版本的 Rpc 通信實現。

Spark2.x 的 Rpc 通信方式主要包括一下幾個重要方面

RpcEndpoint: 消息通信體,主要是用來接收消息、處理消息,實現了 RpcEndPoint 接口就是一個消息通信體(Master、Work),RpcEndpoint 需要向 RpcEnv 注冊

RpcEnv:Rpc 通信的上下文環境,消息發送過來首先經過 RpcEnv 然后路由給對應的 RpcEndPoint,得到 RpcEndPoint

RpcEndPointRef:RpcEndPoint 的引用如果要想某個 RpcEndPoint 發送消息,首先要通過 RpcEnv 得到 RpcEndPoint 的引用

RpcEndPoint 接口 里面的定義如下

val rpcEnv : RpcEnv // 得到 RpcEnv 對象

final def self: RpcEndpointRef = {// 返回一個 RpcEnpointRef 這個方法通常用來自己給自己發送消息

  rpcEnv.endpointRef(this)

  }

def receive: PartialFunction[Any, Unit]// 處理 RpcEndPointRef.send 或者 RpcEndPointRef.reply 方法,該方法不需要進行響應信息

def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]// 處理 RpcEndPointref.ask 發送的消息,處理完之后需要給調用 ask 的通信端響應消息(reply)

def onError(cause: Throwable)// 處理消息失敗的時候會調用此方法

def onConnected(remoteAddress: RpcAddress)// 遠程連接的當前節點的時候觸發

def onDisconnected(remoteAddress: RpcAddress)// 遠程連接斷開時候觸發

def onNetworkError(cause: Throwable, remoteAddress: RpcAddress)// 遠程連接發生網絡異常時觸發

def onStop()// 停止 RpcEndPoint

def onStart()// 啟動 RpcEndPoint,這里不僅僅是網絡上說的啟動 RpcEndPoint 處理任何消息,onStart 方法里面很多情況下可以寫自己的 RpcEndPoint 的一些實現比如啟動端口,或者創建目錄

但是 RpcEndPoint 只有在 onStart 方法做一些處理之后 才可以接受 RpcEndPointRef 發送的消息

private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint// 因為 receive 是并發操作如果要現成安全就是用 threadSafeRpcEndPoint

RpcEndPoint 的生命周期 構造 – onStart– receive — onStop, 注意 onStart 的方法是在調用 setRpcEndPoint 注冊之后就會執行任何 RpcEndPoint 的 onStart 方法都是在注冊之后執行的

原因后面的源碼的提到

RpcEndpointRef: 抽象類

  def address: RpcAddress // 根據主機名端口返回一個 RppAddress

def name: String//name 一個字符串 暫時不知道干嘛的

def send(message: Any): Unit// 向 RpcEndPoint 發送一個消息 不需要返回結果

 def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout) // 向 RpcEndPoint 發送消息并得到返回結果

def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)// 想 RpcEndPoint 發送消息并在一定時間內返回結果 失敗的時候并且進行一定次數的重試

RpcEnv

  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef// 傳入 RpcEndPoint 得到 RpcEndPointref 對象

  def address: RpcAddress// 根據主機名端口返回一個 RppAddress

def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef// 注冊 RpcEndPoint 返回對應的 RpcEndPointRef

def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]// 通過 uri 一步獲取 RpcEndPointRef

 def stop(endpoint: RpcEndpointRef): Unit// 停止 RpcEndPoint 根據 RpcEndPointRef

  def shutdown(): Unit// 關閉 RpcEndPoint

 def awaitTermination(): Unit// 等待 RpcEndPoint 退出

object RpcEnv

 def create(

  name: String,

  host: String,

  port: Int,

  conf: SparkConf,

  securityManager: SecurityManager,

  clientMode: Boolean = false): RpcEnv = {

  val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)

  new NettyRpcEnvFactory().create(config)

  }

// 通過 RpcEnvFactory.create 創建 RpcEnv 環境

RpcEnvConfig

private[spark] case class RpcEnvConfig(

  conf: SparkConf,

  name: String,

  host: String,

  port: Int,

  securityManager: SecurityManager,

  clientMode: Boolean)

case 類 里面包括 SparkConf,name,host,port 等

NettyRpcEnv NettyRpcEnv 通過 NettyRpcEnvFactory 的 create 方法創建

 val nettyEnv =

  new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)// 創建 nettyEnv

 private val dispatcher: Dispatcher = new Dispatcher(this)

Dispatcher 負責 RPC 消息的路由,它能夠將消息路由到對應的 RpcEndpoint 進行處理, 同時存放 RpcEndPoint 與 RpcEndPointRef 的映射

NettyStreamManager 負責提供文件服務(文件、JAR 文件、目錄)

TransportContext 負責管理網路傳輸上下文信息:創建 MessageEncoder、MessageDecoder、TransportClientFactory、TransportServer

NettyRpcHandler 負責處理網絡 IO 事件,接收 RPC 調用請求,并通過 Dispatcher 派發消息

這里說一下 Dispatcher 該類主要負責 Rpc 消息路由 里面有一個內部累 EndPointData 但是有一個現成安全的 Inbox 這里面存放的時候收到的消息,非常重要后面會做具體分析

private class EndpointData(

  val name: String,

  val endpoint: RpcEndpoint,

  val ref: NettyRpcEndpointRef) {

  val inbox = new Inbox(ref, endpoint)

  }

  private val endpoints = new ConcurrentHashMap[String, EndpointData]// 存放 name- 對應的 EndPoint 的信息

  private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]// 存放 RpcEndpoint, RpcEndpointRef 的映射關系

  private val receivers = new LinkedBlockingQueue[EndpointData]// 隊列下面會有一個現成不斷的從里面取出來處理

 def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {

  val addr = RpcEndpointAddress(nettyEnv.address, name)

  val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)

  synchronized {

  if (stopped) {

  throw new IllegalStateException(RpcEnv has been stopped)

  }

  if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {

  throw new IllegalArgumentException(s There is already an RpcEndpoint called $name)

   }

  val data = endpoints.get(name)

  endpointRefs.put(data.endpoint, data.ref)

  receivers.offer(data)  // for the OnStart message

  }

  endpointRef

  }

// 注冊 RpcEndPoint 在這里面發生 同時將 data put 到 receivers 

在 NettyRpcEndPoint 里面有一個 threadpool

private val threadpool: ThreadPoolExecutor = {

  val numThreads = nettyEnv.conf.getInt(spark.rpc.netty.dispatcher.numThreads ,

  math.max(2, Runtime.getRuntime.availableProcessors()))

  val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, dispatcher-event-loop)

  for (i – 0 until numThreads) {

  pool.execute(new MessageLoop)

  }

  pool

  }

MessageLoop 是一個實現了 Runnable 的類,里面的 run 方法里面不斷從 receivers 取出來進行處理

重要代碼   data.inbox.process(Dispatcher.this)

這個里面有一個非常重要的點就是 什么時候調用 onStart 的方法因為 receivers 里面存放的是 EndPoint 的信息 同時創建 EndPointData 對象

進入 Inbox 里面看一下

  inbox =   // Give this an alias so we can use it more clearly in closures.

  @GuardedBy(this)

  protected val messages = new java.util.LinkedList[InboxMessage]()

 inbox.synchronized {

  messages.add(OnStart)

  }

創建這個類的時候會有一個 messagelinkedList 的 list 集合 在創建這個結合之后 就會將 onStart 方法添加到里面,并且是現成安全的

然后 process 方法里面會不斷的拿到集合的數據來進行對應的操作

 case OnStart =

  endpoint.onStart()

  if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {

  inbox.synchronized {

  if (!stopped) {

  enableConcurrent = true

  }

  }

  }

這個時候就會調用 onStart 方法

這個時候相當于 RpcEndPoint 可以接受消息并且處理了

Spark Rpc 通信方式 分為本地消息和遠程消息,本地消息相當于調用的方法直接存放到 Index(中文收件箱),遠程消息需要走 NettyRpcHandler

以上就是如何進行 Spark 底層通信 RPC 源碼分析,丸趣 TV 小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注丸趣 TV 行業資訊頻道。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計6027字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 台东市| 搜索| 黑山县| 和田县| 白银市| 江华| 彭阳县| 蒙自县| 泾川县| 黑山县| 怀远县| 琼结县| 紫云| 乌拉特后旗| 浦县| 田林县| 农安县| 平遥县| 静乐县| 合阳县| 太原市| 建湖县| 德令哈市| 雷州市| 黄陵县| 长白| 阿图什市| 明水县| 宜城市| 塔城市| 临西县| 峨山| 交口县| 玉山县| 平南县| 平昌县| 正定县| 大宁县| 石景山区| 安阳市| 镇坪县|