久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

ReceiverSupervisorImpl實例化怎么實現

137次閱讀
沒有評論

共計 8921 個字符,預計需要花費 23 分鐘才能閱讀完成。

這篇文章主要介紹“ReceiverSupervisorImpl 實例化怎么實現”,在日常操作中,相信很多人在 ReceiverSupervisorImpl 實例化怎么實現問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”ReceiverSupervisorImpl 實例化怎么實現”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

先回顧下 在 Executor 執行的具體的方法

實例化 ReceiverSupervisorImpl

start 之后等待 awaitTermination

// ReceiverTracker.scala line 564
val startReceiverFunc: Iterator[Receiver[_]] =  Unit =
 (iterator: Iterator[Receiver[_]]) =  { if (!iterator.hasNext) {
 throw new SparkException(  Could not start receiver as object not found.)
 }
 if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next()
 assert(iterator.hasNext == false)
 val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
 supervisor.start()
 supervisor.awaitTermination()
 } else {
 // It s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
 }
 }

看下 ReceiverSupervisorImpl 的父類  ReceiverSupervisor 的構造。

成員變量賦值、將當前 supervisor 與 receiver 關聯 (  receiver.attachSupervisor(this) )

注釋也很清晰:在 Worker 上負責監督 Receiver。提供所需所有 處理從 receiver 接收到的數據 的接口

// ReceiverSupervisor.scala line 31
 * Abstract class that is responsible for supervising a Receiver in the worker.
 * It provides all the necessary interfaces for handling the data received by the receiver.
 */
private[streaming] abstract class ReceiverSupervisor( receiver: Receiver[_],
 conf: SparkConf
 ) extends Logging {
 /** Enumeration to identify current state of the Receiver */
 object ReceiverState extends Enumeration {
 type CheckpointState = Value
 val Initialized, Started, Stopped = Value
 }
 import ReceiverState._
 // Attach the supervisor to the receiver
 receiver.attachSupervisor(this) //  將 receiver 與 supervisor 關聯
 private val futureExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool( receiver-supervisor-future , 128))
 /** Receiver id */
 protected val streamId = receiver.streamId
 /** Has the receiver been marked for stop. */
 private val stopLatch = new CountDownLatch(1)
 /** Time between a receiver is stopped and started again */
 private val defaultRestartDelay = conf.getInt(spark.streaming.receiverRestartDelay , 2000)
 /** The current maximum rate limit for this receiver. */
 private[streaming] def getCurrentRateLimit: Long = Long.MaxValue
 /** Exception associated with the stopping of the receiver */
 @volatile protected var stoppingError: Throwable = null
 /** State of the receiver */
 @volatile private[streaming] var receiverState = Initialized
 //  一些方法,其實就是   數據處理接口
}

ReceiverSupervisorImpl 的實例化

實例化了  BlockManagerBasedBlockHandler,用于將數據發送到 BlockManager

實例化 RpcEndpoint

實例化 BlockGenerator 

實例化  BlockGeneratorListener 監聽器

// ReceiverSupervisorImpl.scala line 43
 * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
 * which provides all the necessary functionality for handling the data received by
 * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]
 * object that is used to divide the received data stream into blocks of data.
 */
private[streaming] class ReceiverSupervisorImpl( receiver: Receiver[_],
 env: SparkEnv,
 hadoopConf: Configuration,
 checkpointDirOption: Option[String]
 ) extends ReceiverSupervisor(receiver, env.conf) with Logging {
 private val host = SparkEnv.get.blockManager.blockManagerId.host
 private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
 private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { //  默認是不開啟
 if (checkpointDirOption.isEmpty) {
 throw new SparkException(
  Cannot enable receiver write-ahead log without checkpoint directory set.   +
  Please use streamingContext.checkpoint() to set the checkpoint directory.   +
  See documentation for more details. )
 }
 new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
 receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
 } else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) 
 }
 }
 /** Remote RpcEndpointRef for the ReceiverTracker */
 private val trackerEndpoint = RpcUtils.makeDriverRef(ReceiverTracker , env.conf, env.rpcEnv)
 /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
 private val endpoint = env.rpcEnv.setupEndpoint(  Receiver-  + streamId +  -  + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
 override val rpcEnv: RpcEnv = env.rpcEnv
 override def receive: PartialFunction[Any, Unit] = {
 case StopReceiver = 
 logInfo(Received stop signal)
 ReceiverSupervisorImpl.this.stop(Stopped by driver , None)
 case CleanupOldBlocks(threshTime) = 
 logDebug(Received delete old batch signal)
 cleanupOldBlocks(threshTime)
 case UpdateRateLimit(eps) = 
 logInfo(s Received a new rate limit: $eps.)
 registeredBlockGenerators.foreach { bg = 
 bg.updateRate(eps)
 }
 }
 })
 /** Unique block ids if one wants to add blocks directly */
 private val newBlockId = new AtomicLong(System.currentTimeMillis())
 private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator] //  典型的面包模式
 with mutable.SynchronizedBuffer[BlockGenerator]
 /** Divides received data records into data blocks for pushing in BlockManager. */
 private val defaultBlockGeneratorListener = new BlockGeneratorListener { def onAddData(data: Any, metadata: Any): Unit = { }
 def onGenerateBlock(blockId: StreamBlockId): Unit = { }
 def onError(message: String, throwable: Throwable) { reportError(message, throwable)
 }
 def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { pushArrayBuffer(arrayBuffer, None, Some(blockId))
 }
 }
 private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
 // ...  一些方法
 /** Store an ArrayBuffer of received data as a data block into Spark s memory. */
def pushArrayBuffer( arrayBuffer: ArrayBuffer[_],
 metadataOption: Option[Any],
 blockIdOption: Option[StreamBlockId]
 ) { pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
/** Store block and report it to driver */
def pushAndReportBlock(
 receivedBlock: ReceivedBlock,
 metadataOption: Option[Any],
 blockIdOption: Option[StreamBlockId]
 ) { val blockId = blockIdOption.getOrElse(nextBlockId)
 val time = System.currentTimeMillis
 val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
 logDebug(s Pushed block $blockId in ${(System.currentTimeMillis - time)} ms )
 val numRecords = blockStoreResult.numRecords
 val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
 trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
 logDebug(s Reported block $blockId)
}

看看 BlockGenerator

注釋很清晰,有兩個線程

周期性的   將上一批數據 作為一個 block,并新建下一個批次的數據;RecurringTimer 類,內部有 Thread

將數據 push 到 BlockManager

//
 * Generates batches of objects received by a
 * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
 * named blocks at regular intervals. This class starts two threads,
 * one to periodically start a new batch and prepare the previous batch of as a block,
 * the other to push the blocks into the block manager.
 *
 * Note: Do not create BlockGenerator instances directly inside receivers. Use
 * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
 */
private[streaming] class BlockGenerator(
 listener: BlockGeneratorListener,
 receiverId: Int,
 conf: SparkConf,
 clock: Clock = new SystemClock()
 ) extends RateLimiter(conf) with Logging{private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
 * The BlockGenerator can be in 5 possible states, in the order as follows.
 *
 * - Initialized: Nothing has been started
 * - Active: start() has been called, and it is generating blocks on added data.
 * - StoppedAddingData: stop() has been called, the adding of data has been stopped,
 * but blocks are still being generated and pushed.
 * - StoppedGeneratingBlocks: Generating of blocks has been stopped, but
 * they are still being pushed.
 * - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed.
 */
private object GeneratorState extends Enumeration {
 type GeneratorState = Value
 val Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value
import GeneratorState._
private val blockIntervalMs = conf.getTimeAsMs(spark.streaming.blockInterval ,  200ms)
require(blockIntervalMs   0, s spark.streaming.blockInterval  should be a positive value)
private val blockIntervalTimer =
 new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer,  BlockGenerator) //  周期性線程
private val blockQueueSize = conf.getInt(spark.streaming.blockQueueSize , 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } //  負責將數據 push 的
@volatile private var currentBuffer = new ArrayBuffer[Any]
@volatile private var state = Initialized
//...
}

至此,ReceiverSupervisorImpl 實例化完成。不過,截至目前為止 Receiver 還未啟動。

到此,關于“ReceiverSupervisorImpl 實例化怎么實現”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計8921字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 肇源县| 武邑县| 十堰市| 上虞市| 平原县| 修武县| 盱眙县| 抚顺市| 太原市| 陇南市| 东兴市| 德清县| 庆安县| 鹰潭市| 琼中| 梓潼县| 林西县| 克什克腾旗| 杨浦区| 泸西县| 红桥区| 濮阳市| 永川市| 龙江县| 九龙坡区| 博湖县| 巢湖市| 枞阳县| 虞城县| 田阳县| 临夏县| 乐业县| 信阳市| 靖宇县| 明星| 盈江县| 济宁市| 肥西县| 阿拉善左旗| 柏乡县| 汕头市|