久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Spark Driver啟動流程是怎樣的

186次閱讀
沒有評論

共計 10183 個字符,預計需要花費 26 分鐘才能閱讀完成。

本篇內容介紹了“Spark Driver 啟動流程是怎樣的”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

SparkContext.scala

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
  _schedulerBackend = sched
  _taskScheduler = ts
  _dagScheduler = new DAGScheduler(this)
  _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler s
  // constructor
  _taskScheduler.start()

我們再看一下 SparkContext.createTaskScheduler 當中究竟做了些什么

  case SPARK_REGEX(sparkUrl) =
  val scheduler = new TaskSchedulerImpl(sc)
  val masterUrls = sparkUrl.split(,).map(spark:// + _)
  val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
  scheduler.initialize(backend)
  (backend, scheduler)

我們看到 _taskScheduler 是 TaskSchedulerImpl 的實例,_schedulerBackend 是 StandaloneSchedulerBackend 的實例,而會把 _schedulerBackend 通過 scheduler.initialize 給到 _taskScheduler。

然后再來看一下 _taskScheduler.start() 究竟干了些什么

  override def start() {
  backend.start()

  if (!isLocal conf.getBoolean( spark.speculation , false)) {
  logInfo(Starting speculative execution thread)
  speculationScheduler.scheduleAtFixedRate(new Runnable {
  override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
  checkSpeculatableTasks()
  }
  }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
  }
  }

我們看到首先是對 backend.start() 的調用,我們可以在 StandaloneSchedulerBackend 當中找到 start 的實現:

override def start() {
  super.start()
  launcherBackend.connect()

  // The endpoint for executors to talk to us
  val driverUrl = RpcEndpointAddress(
  sc.conf.get(spark.driver.host),
  sc.conf.get(spark.driver.port).toInt,
  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
  val args = Seq(
  –driver-url , driverUrl,
  –executor-id , {{EXECUTOR_ID}} ,
  –hostname , {{HOSTNAME}} ,
  –cores , {{CORES}} ,
  –app-id , {{APP_ID}} ,
  –worker-url , {{WORKER_URL}} )
  val extraJavaOpts = sc.conf.getOption(spark.executor.extraJavaOptions)
  .map(Utils.splitCommandString).getOrElse(Seq.empty)
  val classPathEntries = sc.conf.getOption(spark.executor.extraClassPath)
  .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
  val libraryPathEntries = sc.conf.getOption(spark.executor.extraLibraryPath)
  .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

  // When testing, expose the parent class path to the child. This is processed by
  // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
  // when the assembly is built with the *-provided profiles enabled.
  val testingClassPath =
  if (sys.props.contains( spark.testing)) {
  sys.props(java.class.path).split(java.io.File.pathSeparator).toSeq
  } else {
  Nil
  }

  // Start executors with a few necessary configs for registering with the scheduler
  val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
  val javaOpts = sparkJavaOpts ++ extraJavaOpts
  val command = Command(org.apache.spark.executor.CoarseGrainedExecutorBackend ,
  args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
  val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse()
  val coresPerExecutor = conf.getOption(spark.executor.cores).map(_.toInt)
  // If we re using dynamic allocation, set our initial executor limit to 0 for now.
  // ExecutorAllocationManager will send the real initial limit to the Master later.
  val initialExecutorLimit =
  if (Utils.isDynamicAllocationEnabled(conf)) {
  Some(0)
  } else {
  None
  }
  val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
  appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
  client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
  client.start()
  launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
  waitForRegistration()
  launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

我們看一下 client.start() 當中究竟了做了些什么:

  def start() {
  // Just launch an rpcEndpoint; it will call back into the listener.
  endpoint.set(rpcEnv.setupEndpoint( AppClient , new ClientEndpoint(rpcEnv)))
  }

endpoint 是一個 AtomicReference,rpcEnv.setupEndpoint 做了 2 件事,一個是注冊一個 endpoint,另外把它的 ref 返回回來。這里哪里體現 start 了?我知道一定會進到 ClientEndpoint 的 start 方法當中去,可是究竟是怎么進去的????

下面這段代碼我們在 Rpc 機制的文章當中提到過,紅色代碼部分,當時并沒有太在意,現在看來,每個 End Point 注冊到 Rpc Env 當中的時候,都會自動觸發它的 start 事件。

  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
  val addr = RpcEndpointAddress(nettyEnv.address, name)
  val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
  synchronized {
  if (stopped) {
  throw new IllegalStateException(RpcEnv has been stopped)
  }
  if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
  throw new IllegalArgumentException(s There is already an RpcEndpoint called $name)
  }
  val data = endpoints.get(name)
  endpointRefs.put(data.endpoint, data.ref)
  receivers.offer(data)  // for the OnStart message
  }
  endpointRef
  }

之后,讓我們找到 ClientEndpoint(StandaloneAppClient 的一個內部類),看它的 onStart 方法:

  override def onStart(): Unit = {
  try {
  registerWithMaster(1)
  } catch {
  case e: Exception =
  logWarning(Failed to connect to master , e)
  markDisconnected()
  stop()
  }
  }

registerWithMaster 當中也存在遞歸調用,不過這個遞歸,是為了 retry 服務的,所以我們直接看 tryRegisterAllMasters()。

  private def registerWithMaster(nthRetry: Int) {
  registerMasterFutures.set(tryRegisterAllMasters())
  registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
  override def run(): Unit = {
  if (registered.get) {
  registerMasterFutures.get.foreach(_.cancel(true))
  registerMasterThreadPool.shutdownNow()
  } else if (nthRetry = REGISTRATION_RETRIES) {
  markDead(All masters are unresponsive! Giving up.)
  } else {
  registerMasterFutures.get.foreach(_.cancel(true))
  registerWithMaster(nthRetry + 1)
  }
  }
  }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
  }

  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  for (masterAddress – masterRpcAddresses) yield {
  registerMasterThreadPool.submit(new Runnable {
  override def run(): Unit = try {
  if (registered.get) {
  return
  }
  logInfo(Connecting to master + masterAddress.toSparkURL + …)
  val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
  masterRef.send(RegisterApplication(appDescription, self))
  } catch {
  case ie: InterruptedException = // Cancelled
  case NonFatal(e) = logWarning(s Failed to connect to master $masterAddress , e)
  }
  })
  }
  }

紅色 2 行代碼,注冊 End Point,并發送消息。這里的 master end point,應該是一個位于 spark 集群,master 節點上的 end point,相對于 driver 上的 Rpc Env 來講,應該是一個 remote 的 end point。

我們找到 master.scala,先看它的類聲明:

private[deploy] class Master(
  override val rpcEnv: RpcEnv,
  address: RpcAddress,
  webUiPort: Int,
  val securityMgr: SecurityManager,
  val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {

再找到它的 receive 方法:

override def receive: PartialFunction[Any, Unit]

只需要看其中一段:

  case RegisterApplication(description, driver) =
  // TODO Prevent repeated registrations from some driver
  if (state == RecoveryState.STANDBY) {
  // ignore, don t send response
  } else {
  logInfo(Registering app + description.name)
  val app = createApplication(description, driver)
  registerApplication(app)
  logInfo(Registered app + description.name + with ID + app.id)
  persistenceEngine.addApplication(app)
  driver.send(RegisteredApplication(app.id, self))
  schedule()
  }

1、創建 app,2、注冊 app,3、持久化 app,4、向 driver 的 endpoint 發送消息,5、schedule()

step4,其中 driver 是跟著 Rpc Message 一起過來的,需要給 driver 發一個注冊 app 的響應。

我們再回到 ClientEndpoint.receive,

  override def receive: PartialFunction[Any, Unit] = {
  case RegisteredApplication(appId_, masterRef) =
  // FIXME How to handle the following cases?
  // 1. A master receives multiple registrations and sends back multiple
  // RegisteredApplications due to an unstable network.
  // 2. Receive multiple RegisteredApplication from different masters because the master is
  // changing.
  appId.set(appId_)
  registered.set(true)
  master = Some(masterRef)
  listener.connected(appId.get)

step5,我們看看當中做了些什么事情:

  private def schedule(): Unit = {
  if (state != RecoveryState.ALIVE) {
  return
  }
  // Drivers take strict precedence over executors
  val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
  val numWorkersAlive = shuffledAliveWorkers.size
  var curPos = 0
  for (driver – waitingDrivers.toList) {// iterate over a copy of waitingDrivers
  // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
  // start from the last worker that was assigned a driver, and continue onwards until we have
  // explored all alive workers.
  var launched = false
  var numWorkersVisited = 0
  while (numWorkersVisited numWorkersAlive !launched) {
  val worker = shuffledAliveWorkers(curPos)
  numWorkersVisited += 1
  if (worker.memoryFree = driver.desc.mem worker.coresFree = driver.desc.cores) {
  launchDriver(worker, driver)
  waitingDrivers -= driver
  launched = true
  }
  curPos = (curPos + 1) % numWorkersAlive
  }
  }
  startExecutorsOnWorkers()
  }

launchDriver(worker, driver) 我們理解為,在 worder 上為當前的 driver 啟動一個線程。

再看一下 startExecutorsOnWorkers():

  private def startExecutorsOnWorkers(): Unit = {
  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
  // in the queue, then the second app, etc.
  for (app – waitingApps if app.coresLeft 0) {
  val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
  // Filter out workers that don t have enough resources to launch an executor
  val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
  .filter(worker = worker.memoryFree = app.desc.memoryPerExecutorMB
  worker.coresFree = coresPerExecutor.getOrElse(1))
  .sortBy(_.coresFree).reverse
  val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

  // Now that we ve decided how many cores to allocate on each worker, let s allocate them
  for (pos – 0 until usableWorkers.length if assignedCores(pos) 0) {
  allocateWorkerResourceToExecutors(
  app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
  }
  }
  }

  private def allocateWorkerResourceToExecutors(
  app: ApplicationInfo,
  assignedCores: Int,
  coresPerExecutor: Option[Int],
  worker: WorkerInfo): Unit = {
  // If the number of cores per executor is specified, we divide the cores assigned
  // to this worker evenly among the executors with no remainder.
  // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
  val numExecutors = coresPerExecutor.map {assignedCores / _}.getOrElse(1)
  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  for (i – 1 to numExecutors) {
  val exec = app.addExecutor(worker, coresToAssign)
  launchExecutor(worker, exec)
  app.state = ApplicationState.RUNNING
  }
  }

  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo(Launching executor + exec.fullId + on worker + worker.id)
  worker.addExecutor(exec)
  worker.endpoint.send(LaunchExecutor(masterUrl,
  exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  exec.application.driver.send(
  ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

1、在一個本地的 worker 變量當中添加一個 exec

2、通知 worker,啟動一個 executor

3、通知 driver,executor added

“Spark Driver 啟動流程是怎樣的”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計10183字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 高台县| 中西区| 仁寿县| 汶上县| 乌兰浩特市| 沂水县| 晋宁县| 扶绥县| 姜堰市| 社会| 南澳县| 孙吴县| 布尔津县| 社旗县| 汤原县| 琼海市| 区。| 宁波市| 罗山县| 三原县| 浑源县| 马鞍山市| 金堂县| 徐水县| 黄龙县| 福清市| 崇明县| 巢湖市| 双鸭山市| 黄浦区| 大渡口区| 卢氏县| 新野县| 上虞市| 深州市| 安塞县| 洞头县| 施秉县| 勃利县| 高要市| 三江|