共計 2688 個字符,預計需要花費 7 分鐘才能閱讀完成。
本篇內容主要講解“Driver 容錯安全性是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“Driver 容錯安全性是什么”吧!
從數據層面,ReceivedBlockTracker 為整個 Spark Streaming 應用程序記錄元數據信息。
從調度層面,DStreamGraph 和 JobGenerator 是 Spark Streaming 調度的核心,記錄當前調度到哪一進度,和業務有關。
ReceivedBlockTracker 在接收到元數據信息后調用 addBlock 方法,先寫入磁盤中,然后在寫入內存中。
根據 batchTime 分配屬于當前 BatchDuration 要處理的數據到 timToAllocatedBlocks 數據結構中。
Time 類的是一個 case class,記錄時間,重載了操作符,隱式轉換,值得借鑒。
case class Time(private val millis: Long) {
def milliseconds: Long = millis
def (that: Time): Boolean = (this.millis that.millis)
def = (that: Time): Boolean = (this.millis = that.millis)
def (that: Time): Boolean = (this.millis that.millis)
def = (that: Time): Boolean = (this.millis = that.millis)
def + (that: Duration): Time = new Time(millis + that.milliseconds)
def – (that: Time): Duration = new Duration(millis – that.millis)
def – (that: Duration): Time = new Time(millis – that.milliseconds)
// Java-friendlier versions of the above.
def less(that: Time): Boolean = this that
def lessEq(that: Time): Boolean = this = that
def greater(that: Time): Boolean = this that
def greaterEq(that: Time): Boolean = this = that
def plus(that: Duration): Time = this + that
def minus(that: Time): Duration = this – that
def minus(that: Duration): Time = this – that
def floor(that: Duration): Time = {
val t = that.milliseconds
new Time((this.millis / t) * t)
}
def floor(that: Duration, zeroTime: Time): Time = {
val t = that.milliseconds
new Time(((this.millis – zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
}
def isMultipleOf(that: Duration): Boolean =
(this.millis % that.milliseconds == 0)
def min(that: Time): Time = if (this that) this else that
def max(that: Time): Time = if (this that) this else that
def until(that: Time, interval: Duration): Seq[Time] = {
(this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
}
def to(that: Time, interval: Duration): Seq[Time] = {
(this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))
}
override def toString: String = (millis.toString + ms)
}
object Time {
implicit val ordering = Ordering.by((time: Time) = time.millis)
}
跟蹤 Time 對象,ReceiverTracker 的 allocateBlocksToBatch 方法中的入參 batchTime 是被 JobGenerator 的 generateJobs 方法調用的。
JobGenerator 的 generateJobs 方法是被定時器發送 GenerateJobs 消息調用的。
GenerateJobs 中的時間參數就是 nextTime,而 nextTime+=period,這個 period 就是 ssc.graph.batchDuration.milliseconds。
nextTime 的初始值是在 start 方法中傳入的 startTime 賦值的,即 RecurringTimer 的 getStartTime 方法的返回值,是當前時間 period 的 (整數倍 +1)。
Period 這個值是我們調用 new StreamingContext 來構造 StreamingContext 時傳入的 Duration 值。
ReceivedBlockTracker 會清除過期的元數據信息,從 HashMap 中移除,也是先寫入磁盤,然后在寫入內存。
元數據的生成,消費和銷毀都有 WAL,所以失敗時就可以從日志中恢復。從源碼分析中得出只有設置了 checkpoint 目錄,才進行 WAL 機制。
對傳入的 checkpoint 目錄來創建日志目錄進行 WAL。
這里是在 checkpoint 目錄下創建文件夾名為 receivedBlockMetadata 的文件夾來保存 WAL 記錄的數據。
把當前的 DStream 和 JobGenerator 的狀態進行 checkpoint,該方法是在 generateJobs 方法最后通過發送 DoCheckpoint 消息,來調用的。
到此,相信大家對“Driver 容錯安全性是什么”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!