久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Driver容錯安全性怎么實現

160次閱讀
沒有評論

共計 5569 個字符,預計需要花費 14 分鐘才能閱讀完成。

這篇文章主要介紹“Driver 容錯安全性怎么實現”,在日常操作中,相信很多人在 Driver 容錯安全性怎么實現問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Driver 容錯安全性怎么實現”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

·  第一、看 ReceiverTracker 的容錯,主要是 ReceiverTracker 接收元數據的進入 WAL, 看 ReceiverTracker 的 addBlock 方法,代碼如下

def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

 try {

  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))

  if (writeResult) {

   synchronized {

  getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo

  }

  logDebug(s Stream ${receivedBlockInfo.streamId} received +

  s block ${receivedBlockInfo.blockStoreResult.blockId} )

  } else {

  logDebug(s Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving +

  s block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log. )

  }

  writeResult

 } catch {

  case NonFatal(e) =

  logError(s Error adding block $receivedBlockInfo , e)

  false

 }

}

writeToLog 方法就是進行 WAL 的操作,看 writeToLog 的代碼

private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {

 if (isWriteAheadLogEnabled) {

  logTrace(s Writing record: $record)

  try {

  writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),

  clock.getTimeMillis())

  true

  } catch {

  case NonFatal(e) =

  logWarning(s Exception thrown while writing record: $record to the WriteAheadLog. , e)

  false

  }

 } else {

  true

 }

}

首先判斷是否開啟了 WAL,根據一下 isWriteAheadLogEnabled 值

private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty

接著看 writeAheadLogOption

private val writeAheadLogOption = createWriteAheadLog()

再看 createWriteAheadLog() 方法

private def createWriteAheadLog(): Option[WriteAheadLog] = {

 checkpointDirOption.map {checkpointDir =

  val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)

  WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)

 }

}

根據 checkpoint 的配置,獲取 checkpoint 的目錄,這里可以看出,checkpoint 可以有多個目錄。
寫完 WAL 才將 receivedBlockInfo 放到內存隊列 getReceivedBlockQueue 中

·  第二、看 ReceivedBlockTracker 的 allocateBlocksToBatch 方法,代碼如下

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {

 if (lastAllocatedBatchTime == null || batchTime lastAllocatedBatchTime) {

  val streamIdToBlocks = streamIds.map {streamId =

  (streamId, getReceivedBlockQueue(streamId).dequeueAll(x = true))

  }.toMap

  val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

  if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

  timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

  lastAllocatedBatchTime = batchTime

  } else {

  logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)

  }

 } else {

  // This situation occurs when:

  // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,

  // possibly processed batch job or half-processed batch job need to be processed again,

  // so the batchTime will be equal to lastAllocatedBatchTime.

  // 2. Slow checkpointing makes recovered batch time older than WAL recovered

  // lastAllocatedBatchTime.

  // This situation will only occurs in recovery time.

  logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)

 }

}

首先從 getReceivedBlockQueue 中獲取每一個 receiver 的 ReceivedBlockQueue 隊列賦值給 streamIdToBlocks,然后包裝一下

val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

allocatedBlocks 就是根據時間獲取的一批元數據,交給對應 batchDuration 的 job,job 在執行的時候就可以使用,在使用前先進行 WAL,如果 job 出錯恢復后,可以知道數據計算到什么位置

val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

  if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

  timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

  lastAllocatedBatchTime = batchTime

  } else {

  logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)

}

·  第三、看 cleanupOldBatches 方法,cleanupOldBatches 的功能是從內存中清楚不用的 batches 元數據,再刪除 WAL 的數據,再刪除之前把要刪除的 batches 信息也進行 WAL

def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {

 require(cleanupThreshTime.milliseconds clock.getTimeMillis())

 val timesToCleanup = timeToAllocatedBlocks.keys.filter {_ cleanupThreshTime}.toSeq

 logInfo(Deleting batches + timesToCleanup)

 if (writeToLog(BatchCleanupEvent(timesToCleanup))) {

  timeToAllocatedBlocks –= timesToCleanup

  writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))

 } else {

  logWarning(Failed to acknowledge batch clean up in the Write Ahead Log.)

 }

}

·  總結一下上面的三種 WAL, 對應下面的三種事件,這就是 ReceiverTracker 的容錯

/** Trait representing any event in the ReceivedBlockTracker that updates its state. */

private[streaming] sealed trait ReceivedBlockTrackerLogEvent

private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)

extends ReceivedBlockTrackerLogEvent

private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)

extends ReceivedBlockTrackerLogEvent

private[streaming] case class BatchCleanupEvent(times: Seq[Time])  extends ReceivedBlockTrackerLogEvent

·  看一下 Dstream.graph 和 JobGenerator 的容錯,從開始

private def generateJobs(time: Time) {

SparkEnv has been removed.

 SparkEnv.set(ssc.env)

 Try {

 

  // allocate received blocks to batch

  // 分配接收到的數據給 batch

  jobScheduler.receiverTracker.allocateBlocksToBatch(time)

  // 使用分配的塊生成 jobs

  graph.generateJobs(time) // generate jobs using allocated block

 } match {

  case Success(jobs) =

  // 獲取元數據信息

  val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

  // 提交 jobSet

  jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

  case Failure(e) =

  jobScheduler.reportError(Error generating jobs for time + time, e)

 }

 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

}

jobs 生成完成后發送 DoCheckpoint 消息,最終調用 doCheckpoint 方法, 代碼如下

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

 if (shouldCheckpoint (time – graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

  logInfo(Checkpointing graph for time + time)

  ssc.graph.updateCheckpointData(time)

  checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

 }

}

 

到此,關于“Driver 容錯安全性怎么實現”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計5569字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 新安县| 綦江县| 镇赉县| 承德市| 隆林| 乐亭县| 上杭县| 梅河口市| 博野县| 仁化县| 富源县| 连云港市| 大城县| 璧山县| 曲沃县| 即墨市| 白沙| 沭阳县| 东方市| 时尚| 天水市| 达拉特旗| 平远县| 资阳市| 宁陕县| 南投县| 平山县| 香河县| 清原| 民丰县| 镇巴县| 都江堰市| 富宁县| 安仁县| 阿城市| 陕西省| 崇义县| 萨嘎县| 南城县| 永德县| 绍兴市|