共計 5569 個字符,預計需要花費 14 分鐘才能閱讀完成。
這篇文章主要介紹“Driver 容錯安全性怎么實現”,在日常操作中,相信很多人在 Driver 容錯安全性怎么實現問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Driver 容錯安全性怎么實現”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!
· 第一、看 ReceiverTracker 的容錯,主要是 ReceiverTracker 接收元數據的進入 WAL, 看 ReceiverTracker 的 addBlock 方法,代碼如下
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
if (writeResult) {
synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
logDebug(s Stream ${receivedBlockInfo.streamId} received +
s block ${receivedBlockInfo.blockStoreResult.blockId} )
} else {
logDebug(s Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving +
s block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log. )
}
writeResult
} catch {
case NonFatal(e) =
logError(s Error adding block $receivedBlockInfo , e)
false
}
}
writeToLog 方法就是進行 WAL 的操作,看 writeToLog 的代碼
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
if (isWriteAheadLogEnabled) {
logTrace(s Writing record: $record)
try {
writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
clock.getTimeMillis())
true
} catch {
case NonFatal(e) =
logWarning(s Exception thrown while writing record: $record to the WriteAheadLog. , e)
false
}
} else {
true
}
}
首先判斷是否開啟了 WAL,根據一下 isWriteAheadLogEnabled 值
private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty
接著看 writeAheadLogOption
private val writeAheadLogOption = createWriteAheadLog()
再看 createWriteAheadLog() 方法
private def createWriteAheadLog(): Option[WriteAheadLog] = {
checkpointDirOption.map {checkpointDir =
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
}
}
根據 checkpoint 的配置,獲取 checkpoint 的目錄,這里可以看出,checkpoint 可以有多個目錄。
寫完 WAL 才將 receivedBlockInfo 放到內存隊列 getReceivedBlockQueue 中
· 第二、看 ReceivedBlockTracker 的 allocateBlocksToBatch 方法,代碼如下
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map {streamId =
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x = true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)
}
}
首先從 getReceivedBlockQueue 中獲取每一個 receiver 的 ReceivedBlockQueue 隊列賦值給 streamIdToBlocks,然后包裝一下
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
allocatedBlocks 就是根據時間獲取的一批元數據,交給對應 batchDuration 的 job,job 在執行的時候就可以使用,在使用前先進行 WAL,如果 job 出錯恢復后,可以知道數據計算到什么位置
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)
}
· 第三、看 cleanupOldBatches 方法,cleanupOldBatches 的功能是從內存中清楚不用的 batches 元數據,再刪除 WAL 的數據,再刪除之前把要刪除的 batches 信息也進行 WAL
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter {_ cleanupThreshTime}.toSeq
logInfo(Deleting batches + timesToCleanup)
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks –= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
} else {
logWarning(Failed to acknowledge batch clean up in the Write Ahead Log.)
}
}
· 總結一下上面的三種 WAL, 對應下面的三種事件,這就是 ReceiverTracker 的容錯
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchCleanupEvent(times: Seq[Time]) extends ReceivedBlockTrackerLogEvent
· 看一下 Dstream.graph 和 JobGenerator 的容錯,從開始
private def generateJobs(time: Time) {
SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
// allocate received blocks to batch
// 分配接收到的數據給 batch
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
// 使用分配的塊生成 jobs
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =
// 獲取元數據信息
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
// 提交 jobSet
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =
jobScheduler.reportError(Error generating jobs for time + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
jobs 生成完成后發送 DoCheckpoint 消息,最終調用 doCheckpoint 方法, 代碼如下
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint (time – graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo(Checkpointing graph for time + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
}
}
到此,關于“Driver 容錯安全性怎么實現”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!