共計(jì) 3186 個(gè)字符,預(yù)計(jì)需要花費(fèi) 8 分鐘才能閱讀完成。
如何理解 Spark 中的核心概念 RDD,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。
RDD 全稱叫做彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets),它是一種分布式的內(nèi)存抽象,表示一個(gè)只讀的記錄分區(qū)的集合,它只能通過其他 RDD 轉(zhuǎn)換而創(chuàng)建,為此,RDD 支持豐富的轉(zhuǎn)換操作 (如: map, join, filter, groupBy 等),通過這種轉(zhuǎn)換操作,新的 RDD 則包含了如何從其他 RDDs 衍生所必需的信息,所以說 RDDs 之間是有依賴關(guān)系的。
基于 RDDs 之間的依賴,RDDs 會(huì)形成一個(gè)有向無環(huán)圖 DAG,該 DAG 描述了整個(gè)流式計(jì)算的流程,實(shí)際執(zhí)行的時(shí)候,RDD 是通過血緣關(guān)系 (Lineage) 一氣呵成的,即使出現(xiàn)數(shù)據(jù)分區(qū)丟失,也可以通過血緣關(guān)系重建分區(qū)。
總結(jié)起來,基于 RDD 的流式計(jì)算任務(wù)可描述為:從穩(wěn)定的物理存儲(chǔ) (如分布式文件系統(tǒng)) 中加載記錄,記錄被傳入由一組確定性操作構(gòu)成的 DAG,然后寫回穩(wěn)定存儲(chǔ)。另外 RDD 還可以將數(shù)據(jù)集緩存到內(nèi)存中,使得在多個(gè)操作之間可以重用數(shù)據(jù)集,基于這個(gè)特點(diǎn)可以很方便地構(gòu)建迭代型應(yīng)用 (圖計(jì)算、機(jī)器學(xué)習(xí)等) 或者交互式數(shù)據(jù)分析應(yīng)用。
可以說 Spark 最初也就是實(shí)現(xiàn) RDD 的一個(gè)分布式系統(tǒng),后面通過不斷發(fā)展壯大成為現(xiàn)在較為完善的大數(shù)據(jù)生態(tài)系統(tǒng),簡(jiǎn)單來講,Spark-RDD 的關(guān)系類似于 Hadoop-MapReduce 關(guān)系。
RDD 特點(diǎn)
RDD 表示只讀的分區(qū)的數(shù)據(jù)集,對(duì) RDD 進(jìn)行改動(dòng),只能通過 RDD 的轉(zhuǎn)換操作,由一個(gè) RDD 得到一個(gè)新的 RDD,新的 RDD 包含了從其他 RDD 衍生所必需的信息。
RDDs 之間存在依賴,RDD 的執(zhí)行是按照血緣關(guān)系延時(shí)計(jì)算的。如果血緣關(guān)系較長(zhǎng),可以通過持久化 RDD 來切斷血緣關(guān)系。
分區(qū)
如下圖所示,RDD 邏輯上是分區(qū)的,每個(gè)分區(qū)的數(shù)據(jù)是抽象存在的,計(jì)算的時(shí)候會(huì)通過一個(gè) compute 函數(shù)得到每個(gè)分區(qū)的數(shù)據(jù)。
如果 RDD 是通過已有的文件系統(tǒng)構(gòu)建,則 compute 函數(shù)是讀取指定文件系統(tǒng)中的數(shù)據(jù),如果 RDD 是通過其他 RDD 轉(zhuǎn)換而來,則 compute 函數(shù)是執(zhí)行轉(zhuǎn)換邏輯將其他 RDD 的數(shù)據(jù)進(jìn)行轉(zhuǎn)換。
只讀
如下圖所示,RDD 是只讀的,要想改變 RDD 中的數(shù)據(jù),只能在現(xiàn)有的 RDD 基礎(chǔ)上創(chuàng)建新的 RDD。
由一個(gè) RDD 轉(zhuǎn)換到另一個(gè) RDD,可以通過豐富的操作算子實(shí)現(xiàn),不再像 MapReduce 那樣只能寫 map 和 reduce 了,如下圖所示。
RDD 的操作算子包括兩類,一類叫做 transformations,它是用來將 RDD 進(jìn)行轉(zhuǎn)化,構(gòu)建 RDD 的血緣關(guān)系; 另一類叫做 actions,它是用來觸發(fā) RDD 的計(jì)算,得到 RDD 的相關(guān)計(jì)算結(jié)果或者將 RDD 保存的文件系統(tǒng)中。下圖是 RDD 所支持的操作算子列表。
依賴
RDDs 通過操作算子進(jìn)行轉(zhuǎn)換,轉(zhuǎn)換得到的新 RDD 包含了從其他 RDDs 衍生所必需的信息,RDDs 之間維護(hù)著這種血緣關(guān)系,也稱之為依賴。如下圖所示,依賴包括兩種,一種是窄依賴,RDDs 之間分區(qū)是一一對(duì)應(yīng)的,另一種是寬依賴,下游 RDD 的每個(gè)分區(qū)與上游 RDD(也稱之為父 RDD)的每個(gè)分區(qū)都有關(guān),是多對(duì)多的關(guān)系。
通過 RDDs 之間的這種依賴關(guān)系,一個(gè)任務(wù)流可以描述為 DAG(有向無環(huán)圖),如下圖所示,在實(shí)際執(zhí)行過程中寬依賴對(duì)應(yīng)于 Shuffle(圖中的 reduceByKey 和 join),窄依賴中的所有轉(zhuǎn)換操作可以通過類似于管道的方式一氣呵成執(zhí)行(圖中 map 和 union 可以一起執(zhí)行)。
緩存
如果在應(yīng)用程序中多次使用同一個(gè) RDD,可以將該 RDD 緩存起來,該 RDD 只有在 *** 次計(jì)算的時(shí)候會(huì)根據(jù)血緣關(guān)系得到分區(qū)的數(shù)據(jù),在后續(xù)其他地方用到該 RDD 的時(shí)候,會(huì)直接從緩存處取而不用再根據(jù)血緣關(guān)系計(jì)算,這樣就加速后期的重用。
如下圖所示,RDD- 1 經(jīng)過一系列的轉(zhuǎn)換后得到 RDD- n 并保存到 hdfs,RDD- 1 在這一過程中會(huì)有個(gè)中間結(jié)果,如果將其緩存到內(nèi)存,那么在隨后的 RDD- 1 轉(zhuǎn)換到 RDD- m 這一過程中,就不會(huì)計(jì)算其之前的 RDD- 0 了。
Checkpoint
雖然 RDD 的血緣關(guān)系天然地可以實(shí)現(xiàn)容錯(cuò),當(dāng) RDD 的某個(gè)分區(qū)數(shù)據(jù)失敗或丟失,可以通過血緣關(guān)系重建。但是對(duì)于長(zhǎng)時(shí)間迭代型應(yīng)用來說,隨著迭代的進(jìn)行,RDDs 之間的血緣關(guān)系會(huì)越來越長(zhǎng),一旦在后續(xù)迭代過程中出錯(cuò),則需要通過非常長(zhǎng)的血緣關(guān)系去重建,勢(shì)必影響性能。
為此,RDD 支持 checkpoint 將數(shù)據(jù)保存到持久化的存儲(chǔ)中,這樣就可以切斷之前的血緣關(guān)系,因?yàn)?checkpoint 后的 RDD 不需要知道它的父 RDDs 了,它可以從 checkpoint 處拿到數(shù)據(jù)。
小結(jié)
總結(jié)起來,給定一個(gè) RDD 我們至少可以知道如下幾點(diǎn)信息:1、分區(qū)數(shù)以及分區(qū)方式;2、由父 RDDs 衍生而來的相關(guān)依賴信息;3、計(jì)算每個(gè)分區(qū)的數(shù)據(jù),計(jì)算步驟為:1)如果被緩存,則從緩存中取的分區(qū)的數(shù)據(jù);2)如果被 Checkpoint,則從 Checkpoint 處恢復(fù)數(shù)據(jù);3)根據(jù)血緣關(guān)系計(jì)算分區(qū)的數(shù)據(jù)。
編程模型
在 Spark 中,RDD 被表示為對(duì)象,通過對(duì)象上的方法調(diào)用來對(duì) RDD 進(jìn)行轉(zhuǎn)換。經(jīng)過一系列的 Transformations 后,就可以調(diào)用 Actions 觸發(fā) RDD 的計(jì)算,Action 可以是向應(yīng)用程序返回結(jié)果(count, collect 等),或者是向存儲(chǔ)系統(tǒng)保存數(shù)據(jù)(saveAsTextFile 等)。在 Spark 中,只有遇到 Action,才會(huì)執(zhí)行 RDD 的計(jì)算(即懶執(zhí)行),這樣在運(yùn)行時(shí)可以通過管道的方式傳輸多個(gè)轉(zhuǎn)換。
要使用 Spark,開發(fā)者需要編寫一個(gè) Driver 程序,它被提交到集群以調(diào)度運(yùn)行 Worker,如下圖所示。Driver 中定義了一個(gè)或多個(gè) RDD,并調(diào)用 RDD 上的 action,Worker 則執(zhí)行 RDD 分區(qū)計(jì)算任務(wù)。
應(yīng)用舉例
下面介紹一個(gè)簡(jiǎn)單的 Spark 應(yīng)用程序?qū)嵗?WordCount,統(tǒng)計(jì)一個(gè)數(shù)據(jù)集中每個(gè)單詞出現(xiàn)的次數(shù),首先將從 HDFS 中加載數(shù)據(jù)得到原始 RDD-0,其中每條記錄為數(shù)據(jù)中的一行句子,經(jīng)過一個(gè) flatMap 操作,將一行句子切分為多個(gè)獨(dú)立的詞,得到 RDD-1,再通過 map 操作將每個(gè)詞映射為 key-value 形式,其中 key 為詞本身,value 為初始計(jì)數(shù)值 1,得到 RDD-2,將 RDD- 2 中的所有記錄歸并,統(tǒng)計(jì)每個(gè)詞的計(jì)數(shù),得到 RDD-3,*** 將其保存到 HDFS。
object WordCount { def main(args: Array[String]) { if (args.length 2) { System.err.println( Usage: WordCount inputfile outputfile System.exit(1); } val conf = new SparkConf().setAppName( WordCount) val sc = new SparkContext(conf) val result = sc.textFile(args(0)) .flatMap(line = line.split( )) .map(word = (word, 1)) .reduceByKey(_ + _) result.saveAsTextFile(args(1)) } }
結(jié)語
基于 RDD 實(shí)現(xiàn)的 Spark 相比于傳統(tǒng)的 Hadoop MapReduce 有什么優(yōu)勢(shì)呢? 總結(jié)起來應(yīng)該至少有三點(diǎn):
1.RDD 提供了豐富的操作算子,不再是只有 map 和 reduce 兩個(gè)操作了,對(duì)于描述應(yīng)用程序來說更加方便;
2. 通過 RDDs 之間的轉(zhuǎn)換構(gòu)建 DAG,中間結(jié)果不用落地;
3.RDD 支持緩存,可以在內(nèi)存中快速完成計(jì)算。
看完上述內(nèi)容,你們掌握如何理解 Spark 中的核心概念 RDD 的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注丸趣 TV 行業(yè)資訊頻道,感謝各位的閱讀!