共計 4245 個字符,預計需要花費 11 分鐘才能閱讀完成。
這篇文章將為大家詳細講解有關分布式數據集 SparkRDD 的依賴與緩存是怎樣的,文章內容質量較高,因此丸趣 TV 小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
RDD 簡介
RDD(Resilient Distributed Dataset)叫做分布式數據集,是 Spark 中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可并行計算的集合。RDD 是一個類
RDD 的屬性
1. 一個列表,存儲存取每個 Partition 的優先位置(preferred location)。對于一個 HDFS 文件來說,這個列表保存的就是每個 Partition 所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark 在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。
2. 保存了計算每個分區的函數,這個計算方法會應用到每一個數據塊上,Spark 中 RDD 的計算是以分片為單位的,每個 RDD 都會實現 compute 函數以達到這個目的。compute 函數會對迭代器進行復合,不需要保存每次計算的結果。
3.RDD 之間的依賴關系。RDD 的每次轉換都會生成一個新的 RDD,所以 RDD 之間就會形成類似于流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark 可以通過這個依賴關系重新計算丟失的分區數據,而不是對 RDD 的所有分區進行重新計算。
4.RDD 的分片函數(Partitioner),一個是基于哈希的 HashPartitioner,另外一個是基于范圍的 RangePartitioner。只有對于于 key-value 的 RDD,才會有 Partitioner,非 key-value 的 RDD 的 Parititioner 的值是 None。Partitioner 函數不但決定了 RDD 本身的分片數量,也決定了 parent RDD Shuffle 輸出時的分片數量。
5. 一組分片(Partition),即數據集的基本組成單位。對于 RDD 來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度。用戶可以在創建 RDD 時指定 RDD 的分片個數,如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的 CPU Core 的數目。
如何創建 RDD
1. 通過序列化集合的方式創建 RDD(parallelize,makeRDD)
2. 通過讀取外部的數據源(testFile)
3. 通過其他的 rdd 做 transformation 操作轉換成行的 RDD
RDD 的兩種算子:
1.Transformation
map(func) : 返回一個新的分布式數據集,由每個原元素經過 func 函數轉換后組成
filter(func) : 返回一個新的數據集,由經過 func 函數后返回值為 true 的原元素組成
flatMap(func) : 類似于 map,但是每一個輸入元素,會被映射為 0 到多個輸出元素(因此,func 函數的返回值是一個 Seq,而不是單一元素)
flatMap(func) : 類似于 map,但是每一個輸入元素,會被映射為 0 到多個輸出元素(因此,func 函數的返回值是一個 Seq,而不是單一元素)
sample(withReplacement, frac, seed) :
根據 fraction 指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed 用于指定隨機數生成器種子
union(otherDataset) : 返回一個新的數據集,由原數據集和參數聯合而成
reduceByKey(func, [numTasks]) : 在一個 (K,V) 對的數據集上使用,返回一個 (K,V) 對的數據集,key 相同的值,都被使用指定的 reduce 函數聚合到一起。和 groupbykey 類似,任務的個數是可以通過第二個可選參數來配置的。
join(otherDataset, [numTasks]) :
在類型為 (K,V) 和(K,W)類型的數據集上調用,返回一個 (K,(V,W)) 對,每個 key 中的所有元素都在一起的數據集
groupWith(otherDataset, [numTasks]) : 在類型為 (K,V) 和(K,W)類型的數據集上調用,返回一個數據集,組成元素為(K, Seq[V], Seq[W]) Tuples。這個操作在其它框架,稱為 CoGroup
cartesian(otherDataset) : 笛卡爾積。但在數據集 T 和 U 上調用時,返回一個 (T,U) 對的數據集,所有元素交互進行笛卡爾積。
intersection(otherDataset): 對源 RDD 和參數 RDD 求交集后返回一個新的 RDD
distinct([numTasks])) 對源 RDD 進行去重后返回一個新的 RDD
groupByKey([numTasks]) 在一個 (K,V) 的 RDD 上調用,返回一個 (K, Iterator[V]) 的 RDD
reduceByKey(func, [numTasks]) 在一個 (K,V) 的 RDD 上調用,返回一個 (K,V) 的 RDD,使用指定的 reduce 函數,將相同 key 的值聚合到一起,與 groupByKey 類似,reduce 任務的個數可以通過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一個 (K,V) 的 RDD 上調用,K 必須實現 Ordered 接口,返回一個按照 key 進行排序的 (K,V) 的 RDD
sortBy(func,[ascending], [numTasks]) 與 sortByKey 類似,但是更靈活
join(otherDataset, [numTasks]) 在類型為 (K,V) 和(K,W)的 RDD 上調用,返回一個相同 key 對應的所有元素對在一起的 (K,(V,W)) 的 RDD
cogroup(otherDataset, [numTasks]) 在類型為 (K,V) 和(K,W)的 RDD 上調用,返回一個(K,(Iterable
2.Action
reduce(func) 通過 func 函數聚集 RDD 中的所有元素,這個功能必須是課交換且可并聯的
collect() 在驅動程序中,以數組的形式返回數據集的所有元素
count() 返回 RDD 的元素個數
first() 返回 RDD 的 *** 個元素(類似于 take(1))
take(n) 返回一個由數據集的前 n 個元素組成的數組
takeSample(withReplacement,num, [seed]) 返回一個數組,該數組由從數據集中隨機采樣的 num 個元素組成,可以選擇是否用隨機數替換不足的部分,seed 用于指定隨機數生成器種子
takeOrdered(n, [ordering])
saveAsTextFile(path) 將數據集的元素以 textfile 的形式保存到 HDFS 文件系統或者其他支持的文件系統,對于每個元素,Spark 將會調用 toString 方法,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數據集中的元素以 Hadoop sequencefile 的格式保存到指定的目錄下,可以使 HDFS 或者其他 Hadoop 支持的文件系統。
saveAsObjectFile(path)
countByKey() 針對 (K,V) 類型的 RDD,返回一個 (K,Int) 的 map,表示每一個 key 對應的元素個數。
foreach(func) 在數據集的每一個元素上,運行函數 func 進行更新。
RDD 的依賴關系
1. 窄依賴
窄依賴指的是每一個父 RDD 的 Partition 最多被子 RDD 的一個 Partition 使用
總結:窄依賴我們形象的比喻為獨生子女
2. 寬依賴
寬依賴指的是多個子 RDD 的 Partition 會依賴同一個父 RDD 的 Partition
總結:窄依賴我們形象的比喻為超生
3.Lineage(血統)
RDD 只支持粗粒度轉換,即在大量記錄上執行的單個操作。將創建 RDD 的一系列 Lineage(即血統)記錄下來,以便恢復丟失的分區。RDD 的 Lineage 會記錄 RDD 的元數據信息和轉換行為,當該 RDD 的部分分區數據丟失時,它可以根據這些信息來重新運算和恢復丟失的數據分區。
DAG 的生成
DAG(Directed Acyclic Graph)叫做有向無環圖,原始的 RDD 通過一系列的轉換就就形成了 DAG,根據 RDD 之間的依賴關系的不同將 DAG 劃分成不同的 Stage,對于窄依賴,partition 的轉換處理在 Stage 中完成計算。對于寬依賴,由于有 Shuffle 的存在,只能在 parent RDD 處理完成后,才能開始接下來的計算,因此寬依賴是劃分 Stage 的依據。
RDD 的緩存
Spark 速度非常快的原因之一,就是在不同操作中可以在內存中持久化或緩存個數據集。當持久化某個 RDD 后,每一個節點都將把計算的分片結果保存在內存中,并在對此 RDD 或衍生出的 RDD 進行的其他動作中重用。這使得后續的動作變得更加迅速。RDD 相關的持久化和緩存,是 Spark 最重要的特征之一。可以說,緩存是 Spark 構建迭代式算法和快速交互式查詢的關鍵。
找依賴關系劃分 stage 的目的之一就是劃分緩存,如何通過 stage 的劃分設置緩存?
(1)在窄依賴想設置緩存時用 cache
(2)在寬依賴想設置緩存時用 checkpoint
如何設置 cache 和 checkpoint?
cache:someRDD.cache()就添加成功緩存,放入到內存中
someRDD.persist(StorageLevel.MEMORY_AND_DISK):根據自己的需要設置緩存的位置(內存和硬盤)
checkpoint:可以把 RDD 計算后的數據存儲在本地磁盤上,也可以是 hdfs
sc.setCheckpointDIr(hdfs://hadoop1:9000/checkpoint)設置 checkpoint 的路徑 在寬依賴前設置
someRDD.checkpoint()設置 checkpoint
cache 和 checkpoint 的區別
cache 只是緩存數據,不改變 RDD 的依賴關系,checkpoint 生成了一個新的 RDD,后面的 RDD 將依賴新的 RDD 依賴關系已經改變 。數據恢復的順序:checkpoint —》cache–》重算
關于分布式數據集 SparkRDD 的依賴與緩存是怎樣的就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。