久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Driver容錯安全性是什么

153次閱讀
沒有評論

共計 2688 個字符,預計需要花費 7 分鐘才能閱讀完成。

本篇內容主要講解“Driver 容錯安全性是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“Driver 容錯安全性是什么”吧!

從數據層面,ReceivedBlockTracker 為整個 Spark Streaming 應用程序記錄元數據信息。

從調度層面,DStreamGraph 和 JobGenerator 是 Spark Streaming 調度的核心,記錄當前調度到哪一進度,和業務有關。

ReceivedBlockTracker 在接收到元數據信息后調用 addBlock 方法,先寫入磁盤中,然后在寫入內存中。

根據 batchTime 分配屬于當前 BatchDuration 要處理的數據到 timToAllocatedBlocks 數據結構中。

Time 類的是一個 case class,記錄時間,重載了操作符,隱式轉換,值得借鑒。

case class Time(private val millis: Long) {
 def milliseconds: Long = millis
 def (that: Time): Boolean = (this.millis that.millis)
 def = (that: Time): Boolean = (this.millis = that.millis)
 def (that: Time): Boolean = (this.millis that.millis)
 def = (that: Time): Boolean = (this.millis = that.millis)
 def + (that: Duration): Time = new Time(millis + that.milliseconds)
 def – (that: Time): Duration = new Duration(millis – that.millis)
 def – (that: Duration): Time = new Time(millis – that.milliseconds)
 // Java-friendlier versions of the above.
 def less(that: Time): Boolean = this that
 def lessEq(that: Time): Boolean = this = that
 def greater(that: Time): Boolean = this that
 def greaterEq(that: Time): Boolean = this = that
 def plus(that: Duration): Time = this + that
 def minus(that: Time): Duration = this – that
 def minus(that: Duration): Time = this – that
 def floor(that: Duration): Time = {
 val t = that.milliseconds
 new Time((this.millis / t) * t)
 }
 def floor(that: Duration, zeroTime: Time): Time = {
 val t = that.milliseconds
 new Time(((this.millis – zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
 }
 def isMultipleOf(that: Duration): Boolean =
 (this.millis % that.milliseconds == 0)
 def min(that: Time): Time = if (this that) this else that
 def max(that: Time): Time = if (this that) this else that
 def until(that: Time, interval: Duration): Seq[Time] = {
 (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
 }
 def to(that: Time, interval: Duration): Seq[Time] = {
 (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))
 }
 override def toString: String = (millis.toString + ms)
}
object Time {
 implicit val ordering = Ordering.by((time: Time) = time.millis)
}

跟蹤 Time 對象,ReceiverTracker 的 allocateBlocksToBatch 方法中的入參 batchTime 是被 JobGenerator 的 generateJobs 方法調用的。

JobGenerator 的 generateJobs 方法是被定時器發送 GenerateJobs 消息調用的。

GenerateJobs 中的時間參數就是 nextTime,而 nextTime+=period,這個 period 就是 ssc.graph.batchDuration.milliseconds。

nextTime 的初始值是在 start 方法中傳入的 startTime 賦值的,即 RecurringTimer 的 getStartTime 方法的返回值,是當前時間 period 的 (整數倍 +1)。

Period 這個值是我們調用 new StreamingContext 來構造 StreamingContext 時傳入的 Duration 值。

Driver 容錯安全性是什么Driver 容錯安全性是什么

ReceivedBlockTracker 會清除過期的元數據信息,從 HashMap 中移除,也是先寫入磁盤,然后在寫入內存。

Driver 容錯安全性是什么

元數據的生成,消費和銷毀都有 WAL,所以失敗時就可以從日志中恢復。從源碼分析中得出只有設置了 checkpoint 目錄,才進行 WAL 機制。

Driver 容錯安全性是什么

對傳入的 checkpoint 目錄來創建日志目錄進行 WAL。

Driver 容錯安全性是什么

這里是在 checkpoint 目錄下創建文件夾名為 receivedBlockMetadata 的文件夾來保存 WAL 記錄的數據。

Driver 容錯安全性是什么

Driver 容錯安全性是什么

把當前的 DStream 和 JobGenerator 的狀態進行 checkpoint,該方法是在 generateJobs 方法最后通過發送 DoCheckpoint 消息,來調用的。

Driver 容錯安全性是什么Driver 容錯安全性是什么Driver 容錯安全性是什么

到此,相信大家對“Driver 容錯安全性是什么”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計2688字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 韶关市| 鄂托克旗| 卢龙县| 泸西县| 乌拉特后旗| 南投县| 板桥市| 新乐市| 昌图县| 巴楚县| 巴中市| 丰镇市| 兴义市| 丰县| 荣昌县| 富源县| 开平市| 辽中县| 临漳县| 靖江市| 平昌县| 浪卡子县| 德化县| 阜南县| 湘阴县| 荣昌县| 廉江市| 安丘市| 敖汉旗| 灯塔市| 巴彦县| 固安县| 贺兰县| 冷水江市| 临泽县| 武陟县| 西青区| 略阳县| 仁化县| 呼图壁县| 九台市|