久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何進(jìn)行JobScheduler內(nèi)幕實(shí)現(xiàn)和深度思考

共計(jì) 3982 個(gè)字符,預(yù)計(jì)需要花費(fèi) 10 分鐘才能閱讀完成。

本篇文章為大家展示了如何進(jìn)行 JobScheduler 內(nèi)幕實(shí)現(xiàn)和深度思考,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

DStream 的 foreachRDD 方法,實(shí)例化 ForEachDStream 對(duì)象,并將用戶定義的函數(shù) foreachFunc 傳入到該對(duì)象中。foreachRDD 方法是輸出操作,foreachFunc 方法會(huì)作用到這個(gè) DStream 中的每個(gè) RDD。

/**
 * Apply a function to each RDD in this DStream. This is an output operator, so
 * this DStream will be registered as an output stream and therefore materialized.
 * @param foreachFunc foreachRDD function
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *  in the `foreachFunc` to be displayed in the UI. If `false`, then
 *  only the scopes and callsites of `foreachRDD` will override those
 *  of the RDDs on the display.
 */
private def foreachRDD(
 foreachFunc: (RDD[T], Time) = Unit,
 displayInnerRDDOps: Boolean): Unit = {
 new ForEachDStream(this,
 context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

ForEachDStream 對(duì)象中重寫了 generateJob 方法,調(diào)用父 DStream 的 getOrCompute 方法來(lái)生成 RDD 并封裝 Job,傳入對(duì)該 RDD 的操作函數(shù) foreachFunc 和 time。dependencies 方法定義為父 DStream 的集合。

/**
 * An internal DStream used to represent output operations like DStream.foreachRDD.
 * @param parent  Parent DStream
 * @param foreachFunc  Function to apply on each RDD generated by the parent DStream
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *  by `foreachFunc` will be displayed in the UI; only the scope and
 *  callsite of `DStream.foreachRDD` will be displayed.
 */
private[streaming]
class ForEachDStream[T: ClassTag] (
 parent: DStream[T],
 foreachFunc: (RDD[T], Time) = Unit,
 displayInnerRDDOps: Boolean
 ) extends DStream[Unit](parent.ssc) {

 override def dependencies: List[DStream[_]] = List(parent)

 override def slideDuration: Duration = parent.slideDuration

 override def compute(validTime: Time): Option[RDD[Unit]] = None

 override def generateJob(time: Time): Option[Job] = {
 parent.getOrCompute(time) match {
 case Some(rdd) =
 val jobFunc = () = createRDDWithLocalProperties(time, displayInnerRDDOps) {
 foreachFunc(rdd, time)
 }
 Some(new Job(time, jobFunc))
 case None = None
 }
 }
}

DStreamGraph 的 generateJobs 方法中會(huì)調(diào)用 outputStream 的 generateJob 方法,就是調(diào)用 ForEachDStream 的 generateJob 方法。

def generateJobs(time: Time): Seq[Job] = {
 logDebug(Generating jobs for time + time)
 val jobs = this.synchronized {
 outputStreams.flatMap {outputStream =
 val jobOption = outputStream.generateJob(time)
 jobOption.foreach(_.setCallSite(outputStream.creationSite))
 jobOption
 }
 }
 logDebug(Generated + jobs.length + jobs for time + time)
 jobs
}

DStream 的 generateJob 定義如下,其子類中只有 ForEachDStream 重寫了 generateJob 方法。

/**
 * Generate a SparkStreaming job for the given time. This is an internal method that
 * should not be called directly. This default implementation creates a job
 * that materializes the corresponding RDD. Subclasses of DStream may override this
 * to generate their own jobs.
 */
private[streaming] def generateJob(time: Time): Option[Job] = {
 getOrCompute(time) match {
 case Some(rdd) = {
 val jobFunc = () = {
 val emptyFunc = {(iterator: Iterator[T]) = {}}
 context.sparkContext.runJob(rdd, emptyFunc)
 }
 Some(new Job(time, jobFunc))
 }
 case None = None
 }
}

DStream 的 print 方法內(nèi)部還是調(diào)用 foreachRDD 來(lái)實(shí)現(xiàn),傳入了內(nèi)部方法 foreachFunc,來(lái)取出 num+ 1 個(gè)數(shù)后打印輸出。

/**
 * Print the first num elements of each RDD generated in this DStream. This is an output
 * operator, so this DStream will be registered as an output stream and there materialized.
 */
def print(num: Int): Unit = ssc.withScope {
 def foreachFunc: (RDD[T], Time) = Unit = {
 (rdd: RDD[T], time: Time) = {
 val firstNum = rdd.take(num + 1)
 // scalastyle:off println
 println(——————————————-)
 println(Time: + time)
 println(——————————————-)
 firstNum.take(num).foreach(println)
 if (firstNum.length num) println(…)
 println()
 // scalastyle:on println
 }
 }
 foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}

總結(jié):JobScheduler 是 SparkStreaming 所有 Job 調(diào)度的中心,內(nèi)部有兩個(gè)重要的成員:

JobGenerator 負(fù)責(zé) Job 的生成,ReceiverTracker 負(fù)責(zé)記錄輸入的數(shù)據(jù)源信息。

JobScheduler 的啟動(dòng)會(huì)導(dǎo)致 ReceiverTracker 和 JobGenerator 的啟動(dòng)。ReceiverTracker 的啟動(dòng)導(dǎo)致運(yùn)行在 Executor 端的 Receiver 啟動(dòng)并且接收數(shù)據(jù),ReceiverTracker 會(huì)記錄 Receiver 接收到的數(shù)據(jù) meta 信息。JobGenerator 的啟動(dòng)導(dǎo)致每隔 BatchDuration,就調(diào)用 DStreamGraph 生成 RDD Graph,并生成 Job。JobScheduler 中的線程池來(lái)提交封裝的 JobSet 對(duì)象 (時(shí)間值,Job,數(shù)據(jù)源的 meta)。Job 中封裝了業(yè)務(wù)邏輯,導(dǎo)致最后一個(gè) RDD 的 action 被觸發(fā),被 DAGScheduler 真正調(diào)度在 Spark 集群上執(zhí)行該 Job。

上述內(nèi)容就是如何進(jìn)行 JobScheduler 內(nèi)幕實(shí)現(xiàn)和深度思考,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注丸趣 TV 行業(yè)資訊頻道。

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-17發(fā)表,共計(jì)3982字。
轉(zhuǎn)載說(shuō)明:除特殊說(shuō)明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請(qǐng)注明出處。
評(píng)論(沒(méi)有評(píng)論)
主站蜘蛛池模板: 合川市| 靖安县| 海城市| 长宁区| 宣威市| 景洪市| 呈贡县| 姜堰市| 衡山县| 翼城县| 望谟县| 肥城市| 于都县| 汽车| 汉寿县| 鄂温| 胶州市| 文化| 墨玉县| 婺源县| 沙湾县| 兴和县| 饶平县| 曲麻莱县| 淮北市| 永丰县| 山东省| 柳林县| 河东区| 武功县| 吉木萨尔县| 涟水县| 怀宁县| 西宁市| 加查县| 新乡市| 黄山市| 仁寿县| 江陵县| 通辽市| 石城县|