久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

DStream與RDD關系是什么

149次閱讀
沒有評論

共計 7991 個字符,預計需要花費 20 分鐘才能閱讀完成。

本篇內容主要講解“DStream 與 RDD 關系是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“DStream 與 RDD 關系是什么”吧!

RDD 是怎么生成的?RDD 依靠什么生成?RDD 生成的依據是什么?Spark Streaming 中 RDD 的執行是否和 Spark Core 中的 RDD 執行有所不同?運行之后我們對 RDD 怎么處理?

RDD 本身也是基本的對象,例如說 BatchInterval 為 1 秒,那么每一秒都會產生 RDD,內存中不能完全容納該對象。每個 BatchInterval 的作業執行完后,怎么對已有的 RDD 進行管理。

ForEachDStream 不一定會觸發 Job 的執行,和 Job 的執行沒有關系。

Job 的產生是由 Spark Streaming 框架造成的。

foreachRDD 是 Spark Streaming 的后門,可以直接對 RDD 進行操作。

DStream 就是 RDD 的模板,后面的 DStream 與前面的 DStream 有依賴。

val lines = jsc.socketTextStream(127.0.0.1 , 9999) 這里產生了 SocketInputDStream。

lines.flatMap(_.split()).map(word = (word, 1)).reduceByKey(_ + _).print() 這里由 SocketInputDStream 轉換為 FlatMappedDStream,再轉換為 MappedDStream,再轉換為 ShuffledDStream,再轉換為 ForEachDStream。

對于 DStream 類,源碼中是這樣解釋的。

* DStreams internally is characterized by a few basic properties:
*  – A list of other DStreams that the DStream depends on
*  – A time interval at which the DStream generates an RDD
*  – A function that is used to generate an RDD after each time interval

大致意思是:

1.DStream 依賴于其他 DStream。

2. 每隔 BatchDuration,DStream 生成一個 RDD

3. 每隔 BatchDuration,DStream 內部函數會生成 RDD

DStream 是從后往前依賴,因為 DStream 代表 Spark Streaming 業務邏輯,RDD 是從后往前依賴的,DStream 是 lazy 級別的。DStream 的依賴關系必須和 RDD 的依賴關系保持高度一致。

DStream 類中 generatedRDDs 存儲著不同時間對應的 RDD 實例。每一個 DStream 實例都有自己的 generatedRDDs。實際運算的時候,由于是從后往前推,計算只作用于最后一個 DStream。

// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

generatedRDDs 是如何獲取的。DStream 的 getOrCompute 方法,先根據時間判斷 HashMap 中是否已存在該時間對應的 RDD,如果沒有則調用 compute 得到 RDD,并放入到 HashMap 中。

/**
 * Get the RDD corresponding to the given time; either retrieve it from cache
 * or compute-and-cache it.
 */
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
 // If RDD was already generated, then retrieve it from HashMap,
 // or else compute the RDD
 generatedRDDs.get(time).orElse {
 // Compute the RDD if time is valid (e.g. correct time in a sliding window)
 // of RDD generation, else generate nothing.
 if (isTimeValid(time)) {

 val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
 // Disable checks for existing output directories in jobs launched by the streaming
 // scheduler, since we may need to write output to an existing directory during checkpoint
 // recovery; see SPARK-4835 for more details. We need to have this call here because
 // compute() might cause Spark jobs to be launched.
 PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
 compute(time)
 }
 }

 rddOption.foreach {case newRDD =
 // Register the generated RDD for caching and checkpointing
 if (storageLevel != StorageLevel.NONE) {
 newRDD.persist(storageLevel)
 logDebug(s Persisting RDD ${newRDD.id} for time $time to $storageLevel )
 }
 if (checkpointDuration != null (time – zeroTime).isMultipleOf(checkpointDuration)) {
 newRDD.checkpoint()
 logInfo(s Marking RDD ${newRDD.id} for time $time for checkpointing )
 }
 generatedRDDs.put(time, newRDD)
 }
 rddOption
 } else {
 None
 }
 }
}

拿 DStream 的子類 ReceiverInputDStream 來說明 compute 方法,內部調用了 createBlockRDD 這個方法。

/**
 * Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = {
 val blockRDD = {
 if (validTime graph.startTime) {
 // If this is called for any time before the start time of the context,
 // then this returns an empty RDD. This may happen when recovering from a
 // driver failure without any write ahead log to recover pre-failure data.
 new BlockRDD[T](ssc.sc, Array.empty)
 } else {
 // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
 // for this batch
 val receiverTracker = ssc.scheduler.receiverTracker
 val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

 // Register the input blocks information into InputInfoTracker
 val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
 ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

 // Create the BlockRDD
 createBlockRDD(validTime, blockInfos)
 }
 }
 Some(blockRDD)
}

createBlockRDD 會返回 BlockRDD,由于 ReceiverInputDStream 沒有父依賴,所以自己生成 RDD。

private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
 if (blockInfos.nonEmpty) {
 val blockIds = blockInfos.map {_.blockId.asInstanceOf[BlockId] }.toArray

 // Are WAL record handles present with all the blocks
 val areWALRecordHandlesPresent = blockInfos.forall {_.walRecordHandleOption.nonEmpty}

 if (areWALRecordHandlesPresent) {
 // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
 val isBlockIdValid = blockInfos.map {_.isBlockIdValid() }.toArray
 val walRecordHandles = blockInfos.map {_.walRecordHandleOption.get}.toArray
 new WriteAheadLogBackedBlockRDD[T](
 ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
 } else {
 // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
 // others then that is unexpected and log a warning accordingly.
 if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
 if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
 logError(Some blocks do not have Write Ahead Log information; +
  this is unexpected and data may not be recoverable after driver failures )
 } else {
 logWarning(Some blocks have Write Ahead Log information; this is unexpected)
 }
 }
 val validBlockIds = blockIds.filter {id =
 ssc.sparkContext.env.blockManager.master.contains(id)
 }
 if (validBlockIds.size != blockIds.size) {
 logWarning(Some blocks could not be recovered as they were not found in memory. +
  To prevent such data loss, enabled Write Ahead Log (see programming guide +
  for more details. )
 }
 new BlockRDD[T](ssc.sc, validBlockIds)
 }
 } else {
 // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
 // according to the configuration
 if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
 new WriteAheadLogBackedBlockRDD[T](
 ssc.sparkContext, Array.empty, Array.empty, Array.empty)
 } else {
 new BlockRDD[T](ssc.sc, Array.empty)
 }
 }
}

再拿 DStream 的子類 MappedDStream 來說,這里的 compute 方法,是調用父 RDD 的 getOrCompute 方法獲得 RDD,再使用 map 操作。

private[streaming]
class MappedDStream[T: ClassTag, U: ClassTag] (
 parent: DStream[T],
 mapFunc: T = U
 ) extends DStream[U](parent.ssc) {

 override def dependencies: List[DStream[_]] = List(parent)

 override def slideDuration: Duration = parent.slideDuration

 override def compute(validTime: Time): Option[RDD[U]] = {
 parent.getOrCompute(validTime).map(_.map[U](mapFunc))
 }
}

從上面兩個 DStream 的子類,可以說明第一個 DStream,即 InputDStream 的 comput 方法是自己獲取數據并計算的,而其他的 DStream 是依賴父 DStream 的,調用父 DStream 的 getOrCompute 方法,然后進行計算。

以上說明了對 DStream 的操作最后作用于對 RDD 的操作。

接著看下 DStream 的另一個子類 ForEachDStream,發現其 compute 方法沒有任何操作,但是重寫了 generateJob 方法。

private[streaming]
class ForEachDStream[T: ClassTag] (
 parent: DStream[T],
 foreachFunc: (RDD[T], Time) = Unit,
 displayInnerRDDOps: Boolean
 ) extends DStream[Unit](parent.ssc) {

 override def dependencies: List[DStream[_]] = List(parent)

 override def slideDuration: Duration = parent.slideDuration

 override def compute(validTime: Time): Option[RDD[Unit]] = None

 override def generateJob(time: Time): Option[Job] = {
 parent.getOrCompute(time) match {
 case Some(rdd) =
 val jobFunc = () = createRDDWithLocalProperties(time, displayInnerRDDOps) {
 foreachFunc(rdd, time)
 }
 Some(new Job(time, jobFunc))
 case None = None
 }
 }
}

從 Job 生成入手,JobGenerator 的 generateJobs 方法,內部調用的 DStreamGraph 的 generateJobs 方法。

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
 // Set the SparkEnv in this thread, so that job generation code can access the environment
 // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
 // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
 SparkEnv.set(ssc.env)
 Try {
 // 根據特定的時間獲取具體的數據
 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
 // 調用 DStreamGraph 的 generateJobs 生成 Job
 graph.generateJobs(time) // generate jobs using allocated block
 } match {
 case Success(jobs) =
 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
 jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
 case Failure(e) =
 jobScheduler.reportError(Error generating jobs for time + time, e)
 }
 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

DStreamGraph 的 generateJobs 方法調用了 OutputStream 的 generateJob 方法,OutputStream 就是 ForEachDStream。

def generateJobs(time: Time): Seq[Job] = {
 logDebug(Generating jobs for time + time)
 val jobs = this.synchronized {
 outputStreams.flatMap {outputStream =
 val jobOption = outputStream.generateJob(time)
 jobOption.foreach(_.setCallSite(outputStream.creationSite))
 jobOption
 }
 }
 logDebug(Generated + jobs.length + jobs for time + time)
 jobs
}

到此,相信大家對“DStream 與 RDD 關系是什么”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計7991字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 微山县| 尉氏县| 犍为县| 鄄城县| 聂拉木县| 武隆县| 伊通| 内丘县| 金湖县| 肥东县| 淮阳县| 满洲里市| 哈巴河县| 永仁县| 资阳市| 杭州市| 清河县| 泸水县| 东乡| 巴彦县| 当阳市| 米脂县| 页游| 亳州市| 黄浦区| 宜阳县| 治县。| 阳新县| 延吉市| 雅江县| 兴义市| 南丹县| 东方市| 崇明县| 涿州市| 武汉市| 长子县| 阿拉尔市| 民勤县| 临湘市| 罗甸县|