久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Executor容錯安全性實例分析

156次閱讀
沒有評論

共計 6042 個字符,預計需要花費 16 分鐘才能閱讀完成。

這篇文章主要介紹“Executor 容錯安全性實例分析”,在日常操作中,相信很多人在 Executor 容錯安全性實例分析問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Executor 容錯安全性實例分析”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

sparkstreaming 會不斷的接收數據、不斷的產生 job、不斷的提交 job。所以有一個至關重要的問題就是數據安全性。由于 sparkstreaming 是基于 sparkcore 的,如果我們可以確保數據安全可靠的話(sparkstreaming 生產 job 的時候里面是基于 RDD),即使運行的時候出現錯誤或者故障,也可以基于 RDD 的容錯的能力自動進行恢復。所以要確保數據的安全性。

對于 executor 的安全容錯主要是數據的安全容錯。Executor 計算時候的安全容錯是借助 spark core 的 RDD 的,所以天然是安全的。

數據安全性的一種方式是存儲一份副本,另一種方式是不做副本,但是數據源支持重放(也就是可以反復的讀取數據源的數據),如果之前讀取的數據出現問題,可以重新讀取數據。

做副本的方式可以借助 blockmanager 做備份。Blockmanager 存儲數據的時候有很多 storagelevel,Receiver 接收數據后,存儲的時候指定 storagelevel 為 MEMORY_AND_DISK_SER_2 的方式。Blockmanager 早存儲的時候會先考慮 memory,只有 memory 不夠的時候才會考慮 disk,一般 memory 都是夠的。所以至少兩個 executor 上都會有數據,假設一個 executor 掛掉,就會馬上切換到另一個 executor。

ReceiverSupervisorImpl 在存儲數據的時候會有兩種方式,一種是 WAL 的方式,究竟是不是 WAL 得方式是通過配置修改的。默認是 false。如果用 WAL 的方式必須有 checkpoint 的目錄,因為 WAL 的數據是放在 checkpoint 的目錄之下的。

def enableReceiverLog(conf: SparkConf): Boolean = {
 conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
}

Storagelevel 是在構建 inputDstream 的時候傳入的,默認就是 MEMORY_AND_DISK_SER_2。

* @param storageLevel  Storage level to use for storing the received objects
 *  (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 */
def socketTextStream(
 hostname: String,
 port: Int,
 storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
 ): ReceiverInputDStream[String] = withNamedScope(socket text stream) {
 socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

現在來看 ReceiverSupervisorImpl 在存儲數據的另一種方式(副本方式)。注釋中說的很清楚,根據指定的 storagelevel 把接收的 blocks 交給 blockmanager。也就是通過 blockmanager 來存儲。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks into a block manager with the specified storage level.
 */
private[streaming] class BlockManagerBasedBlockHandler(
 blockManager: BlockManager, storageLevel: StorageLevel)

Blockmanager 存儲的時候會分為多種不同的數據類型,ArrayBufferBlock,IteratorBlock,ByteBufferBlock。

Blockmanager 存儲數據前面已經講過了。Receiver 在接收到數據后除了在自己這個 executor 上面存儲,還會在另外一個 executor 上存儲。如果一個 executor 出現問題會瞬間切換到另一個 executor。

WAL 的方式原理:在具體的目錄下會做一份日志,假設后續處理的過程中出了問題,可以基于日志恢復,日志是寫在 checkpoint 下。在生產環境下 checkpoint 是在 HDFS 上,這樣日志就會有三份副本。

下面就是用 WAL 存儲數據的類,先寫日志再交給 blockmanager 存儲。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks in both, a write ahead log and a block manager.
 */
private[streaming] class WriteAheadLogBasedBlockHandler(

如果采用 WAL 的方式,存儲數據的時候就不需要有兩份副本,這樣太浪費內存,如果 storagelevel.replication 大于 1 就會打印警告日志。

private val effectiveStorageLevel = {
 if (storageLevel.deserialized) {
 logWarning(s Storage level serialization ${storageLevel.deserialized} is not supported when  +
 s write ahead log is enabled, change to serialization false )
 }
 if (storageLevel.replication  1) {
 logWarning(s Storage level replication ${storageLevel.replication} is unnecessary when  +
 s write ahead log is enabled, change to replication 1 )
 }

 StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}

這里采用兩條線程的線程池,使得 blockmanager 存儲數據和 write ahead log 可以并發的執行。

// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
implicit private val executionContext = ExecutionContext.fromExecutorService(
 ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))

這個是把日志寫入 WAL 中

// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
 writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}

負責讀寫 WAL 的是 WriteAheadLog,這是一個抽象類,負責寫入、讀取、清除數據的功能。在寫入數據后會返回一個句柄,以供讀取數據使用。

看一下具體寫入數據的實現。如果失敗并且失敗次數小于最大的失敗次數就會重試。確實是返回了一個句柄。

/**
 * Write a byte buffer to the log file. This method synchronously writes the data in the
 * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
 * to HDFS, and will be available for readers to read.
 */
def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
 var fileSegment: FileBasedWriteAheadLogSegment = null
 var failures = 0
 var lastException: Exception = null
 var succeeded = false
 while (!succeeded failures maxFailures) {
 try {
 fileSegment = getLogWriter(time).write(byteBuffer)
 if (closeFileAfterWrite) {
 resetWriter()
 }
 succeeded = true
 } catch {
 case ex: Exception =
 lastException = ex
 logWarning(Failed to write to write ahead log)
 resetWriter()
 failures += 1
 }
 }
 if (fileSegment == null) {
 logError(s Failed to write to write ahead log after $failures failures)
 throw lastException
 }
 fileSegment
}

下面就是把數據寫入 HDFS 的代碼

/** Write the bytebuffer to the log file */
def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
 assertOpen()
 data.rewind() // Rewind to ensure all data in the buffer is retrieved
 val lengthToWrite = data.remaining()
 val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
 stream.writeInt(lengthToWrite)
 if (data.hasArray) {
 stream.write(data.array())
 } else {
 // If the buffer is not backed by an array, we transfer using temp array
 // Note that despite the extra array copy, this should be faster than byte-by-byte copy
 while (data.hasRemaining) {
 val array = new Array[Byte](data.remaining)
 data.get(array)
 stream.write(array)
 }
 }
 flush()
 nextOffset = stream.getPos()
 segment
}

不管是 WAL 還是直接交給 blockmanager 都是采用副本的方式。還有一種是數據源支持數據存放,典型的就是 kafka。Kafka 已經成為了數據存儲系統,它天然具有容錯和數據副本。

Kafka 有 receiver 和 direct 的方式。Receiver 的方式其實是交給 zookeper 來管理 matadata 的(偏移量 offset),如果數據處理失敗后,kafka 會基于 offset 重新讀取數據。為什么可以重新讀取?如果程序崩潰或者數據沒處理完是不會給 zookeper 發 ack。Zookeper 就認為這個數據沒有被消費。實際生產環境下越來越多的使用 directAPI 的方式,直接去操作 kafka 并且是自己管理 offset。這就可以保證有且只有一次的容錯處理。DirectKafkaInputDstream,它會去看最新的 offset,并把這個內容放入 batch 中。

獲取最新的 offset,通過最新的 offset 減去上一個 offset 就可以確定讀哪些數據,也就是一個 batch 中的數據。

@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
 val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
 // Either.fold would confuse @tailrec, do it manually
 if (o.isLeft) {
 val err = o.left.get.toString
 if (retries = 0) {
 throw new SparkException(err)
 } else {
 log.error(err)
 Thread.sleep(kc.config.refreshLeaderBackoffMs)
 latestLeaderOffsets(retries – 1)
 }
 } else {
 o.right.get
 }
}

容錯的弊端就是消耗性能,占用時間。也不是所有情況都不能容忍數據丟失。有些情況下可以不進行容錯來提高性能。

假如一次處理 1000 個 block,但是有 1 個 block 出錯,就需要把 1000 個 block 進行重新讀取或者恢復,這也有性能問題。

到此,關于“Executor 容錯安全性實例分析”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計6042字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 荔波县| 民县| 新邵县| 武穴市| 合水县| 江源县| 伊金霍洛旗| 留坝县| 五大连池市| 五华县| 光泽县| 定远县| 平泉县| 临沧市| 潞城市| 富顺县| 赞皇县| 江陵县| 澳门| 巴青县| 丰都县| 南平市| 宜良县| 临漳县| 虹口区| 霸州市| 扎兰屯市| 叙永县| 光山县| 逊克县| 壶关县| 桃园县| 漾濞| 贡觉县| 尉氏县| 罗山县| 黑河市| 大方县| 宁海县| 台湾省| 上高县|