久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

ReceiverTracker是怎么處理數據的

174次閱讀
沒有評論

共計 6543 個字符,預計需要花費 17 分鐘才能閱讀完成。

本篇內容介紹了“ReceiverTracker 是怎么處理數據的”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

ReceiverTracker 可以以 Driver 中具體的算法計算出在具體的 executor 上啟動 Receiver。啟動 Receiver 的方法是封裝在一個 task 中運行,這個 task 是 job 中唯一的 task。實質上講,ReceiverTracker 啟動 Receiver 時封裝成一個又一個的 job。啟動 Receiver 的方法中有一個 ReceiverSupervisorImpl,ReceiverSupervisorImpl 的 start 方法會導致 Receiver 早 work 節點上真正的執行。轉過來通過 BlockGenerator 把接收到的數據放入 block 中,并通過 ReceiverSupervisorImpl 把 block 進行存儲,然后把數據的元數據匯報給 ReceiverTracker。

下面就講 ReceiverTracker 在接收到數據之后具體怎么處理。

ReceiverSupervisorImpl 把 block 進行存儲是通過 receivedBlockHandler 來寫的。

private val receivedBlockHandler: ReceivedBlockHandler = {
 if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
 …
 new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
 receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
 } else {
 new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
 }
}

一種是通過 WAL 的方式,一種是通過 BlockManager 的方式。

/** Store block and report it to driver */
def pushAndReportBlock(
 receivedBlock: ReceivedBlock,
 metadataOption: Option[Any],
 blockIdOption: Option[StreamBlockId]
 ) {
 val blockId = blockIdOption.getOrElse(nextBlockId)
 val time = System.currentTimeMillis
 val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
 logDebug(s Pushed block $blockId in ${(System.currentTimeMillis – time)} ms )
 val numRecords = blockStoreResult.numRecords
 val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
 trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
 logDebug(s Reported block $blockId)
}

把數據存儲起來切向 ReceiverTracker 匯報。匯報的時候是元數據。

/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
 streamId: Int,
 numRecords: Option[Long],
 metadataOption: Option[Any],
 blockStoreResult: ReceivedBlockStoreResult

Sealed 關鍵字的意思就是所有的子類都在當前的文件中

ReceiverTracker 管理 Receiver 的啟動、回收、接收匯報的元數據。ReceiverTracker 在實例化之前必須所有的 input stream 都已經被 added 和 streamingcontext.start()。因為 ReceiverTracker 要為每個 input stream 啟動一個 Receiver。

ReceiverTracker 中有所有的輸入數據來源和 ID。

private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamIds = receiverInputStreams.map {_.id }

ReceiverTracker 的狀態

/** Enumeration to identify current state of the ReceiverTracker */
object TrackerState extends Enumeration {
 type TrackerState = Value
 val Initialized, Started, Stopping, Stopped = Value
}

下面看一下 ReceiverTracker 在接收到 ReceiverSupervisorImpl 發送的 AddBlock 的消息后的處理。

case AddBlock(receivedBlockInfo) =
 if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
 walBatchingThreadPool.execute(new Runnable {
 override def run(): Unit = Utils.tryLogNonFatalError {
 if (active) {
 context.reply(addBlock(receivedBlockInfo))
 } else {
 throw new IllegalStateException(ReceiverTracker RpcEndpoint shut down.)
 }
 }
 })
 } else {
 context.reply(addBlock(receivedBlockInfo))
 }

先判斷一下是不是 WAL 得方式,如果是就用線程池中的一個線程來回復 addBlock,因為 WAL 非常消耗性能。否則就直接回復 addBlock。

  讓后交給 receiverBlockTracker 進行處理

/** Add new blocks for the given stream */
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
 receivedBlockTracker.addBlock(receivedBlockInfo)
}

ReceiverBlockTracker 是在 Driver 端管理 blockInfo 的。

/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
 try {
 val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
 if (writeResult) {
 synchronized {
 getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
 }
 logDebug(s Stream ${receivedBlockInfo.streamId} received  +
 s block ${receivedBlockInfo.blockStoreResult.blockId} )
 } else {
 logDebug(s Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving  +
 s block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log. )
 }
 writeResult
 } catch {
 case NonFatal(e) =
 logError(s Error adding block $receivedBlockInfo , e)
 false
 }
}

writeToLog 的代碼很簡單,首先判斷是不是 WAL 得方式,如果是就把 blockInfo 寫入到日志中,用于以后恢復數據。否則的話就直接返回 true。然后就把 block 的信息放入 streamIdToUnallocatedBlockQueues 中。

private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]

這個數據結構很精妙,key 是 streamid,value 是一個隊列,把每一個 stream 接收的 block 信息分開存儲。這樣 ReceiverBlockTracker 就有了所有 stream 接收到的 block 信息。

/** Write an update to the tracker to the write ahead log */
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
 if (isWriteAheadLogEnabled) {
 logTrace(s Writing record: $record)
 try {
 writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
 clock.getTimeMillis())
 true
 } catch {
 case NonFatal(e) =
 logWarning(s Exception thrown while writing record: $record to the WriteAheadLog. , e)
 false
 }
 } else {
 true
 }
}

詳細看一下 ReceiverBlockTracker 的注釋。這個 class 會追蹤所有接收到的 blocks,并把他們按 batch 分配,如果有需要這個 class 接收的所有 action 都可以寫 WAL 中,如果指定了 checkpoint 的目錄,當 Driver 崩潰了,ReceiverBlockTracker 的狀態(包括接收的 blocks 和分配的 blocks)都可以恢復。如果實例化這個 class 的時候指定了 checkpoint,就會從中讀取之前保存的信息。

/**
 * Class that keep track of all the received blocks, and allocate them to batches
 * when required. All actions taken by this class can be saved to a write ahead log
 * (if a checkpoint directory has been provided), so that the state of the tracker
 * (received blocks and block-to-batch allocations) can be recovered after driver failure.
 *
 * Note that when any instance of this class is created with a checkpoint directory,
 * it will try reading events from logs in the directory.
 */
private[streaming] class ReceivedBlockTracker(

下面看一下 ReceiverTracker 接收到 CleanupOldBlocks 后的處理。

case c: CleanupOldBlocks =
 receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))

ReceiverTracker 接收到這條消息后會給它管理的每一個 Receiver 發送這個消息。ReceiverSupervisorImpl 接收到消息后使用 receivedBlockHandler 清理數據。

private def cleanupOldBlocks(cleanupThreshTime: Time): Unit = {
 logDebug(s Cleaning up blocks older then $cleanupThreshTime)
 receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)
}

ReceiverTracker 還可以隨時調整某一個 streamID 接收數據的速度,向對應的 ReceiverSupervisorImpl 發送 UpdateRateLimit 的消息。

case UpdateReceiverRateLimit(streamUID, newRate) =
 for (info – receiverTrackingInfos.get(streamUID); eP – info.endpoint) {
 eP.send(UpdateRateLimit(newRate))
 }

ReceiverSupervisorImpl 接收到消息后。

case UpdateRateLimit(eps) =
 logInfo(s Received a new rate limit: $eps.)
 registeredBlockGenerators.foreach {bg =
 bg.updateRate(eps)
 }

/**
 * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
 * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
 *
 * @param newRate A new rate in events per second. It has no effect if it s 0 or negative.
 */
private[receiver] def updateRate(newRate: Long): Unit =
 if (newRate  0) {
 if (maxRateLimit   0) {
 rateLimiter.setRate(newRate.min(maxRateLimit))
 } else {
 rateLimiter.setRate(newRate)
 }
 }

ReceiverTracker 是一個門面設計模式,看似調用的是 ReceiverTracker 的功能,其實調用的是別的類的功能。

“ReceiverTracker 是怎么處理數據的”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計6543字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 梅河口市| 海淀区| 福建省| 固镇县| 望谟县| 霍邱县| 印江| 策勒县| 申扎县| 漳州市| 兰考县| 二手房| 宝鸡市| 三门县| 黄平县| 朔州市| 当雄县| 竹溪县| 阿拉善左旗| 繁峙县| 闽侯县| 宁安市| 玉环县| 屏东县| 陈巴尔虎旗| 曲阳县| 广元市| 满洲里市| 大渡口区| 东丰县| 宜章县| 同江市| 开封市| 肥东县| 德安县| 徐水县| 虎林市| 赞皇县| 衡东县| 中阳县| 台中县|