共計 4941 個字符,預計需要花費 13 分鐘才能閱讀完成。
這篇“Spark 中兩個類似的 api 是什么”文章的知識點大部分人都不太理解,所以丸趣 TV 小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Spark 中兩個類似的 api 是什么”文章吧。
Spark 中有兩個類似的 api,分別是 reduceByKey 和 groupByKey 。這兩個的功能類似,但底層實現卻有些不同,那么為什么要這樣設計呢?我們來從源碼的角度分析一下。
先看兩者的調用順序(都是使用默認的 Partitioner,即 defaultPartitioner)
所用 spark 版本:spark 2.1.0
#### 先看 reduceByKey
Step1
“`
def reduceByKey(func: (V, V) = V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
“`
Setp2
“`
def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) = v, func, func, partitioner)
}
“`
Setp3
“`
def combineByKeyWithClassTag[C](
createCombiner: V = C,
mergeValue: (C, V) = C,
mergeCombiners: (C, C) = C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, mergeCombiners must be defined) // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException(Cannot use map-side combining with array keys.)
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException(HashPartitioner cannot partition array keys.)
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter = {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
“`
姑且不去看方法里面的細節,我們會只要知道最后調用的是 combineByKeyWithClassTag 這個方法。這個方法有兩個參數我們來重點看一下,
“`
def combineByKeyWithClassTag[C](
createCombiner: V = C,
mergeValue: (C, V) = C,
mergeCombiners: (C, C) = C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)
“`
首先是 **partitioner** 參數,這個即是 RDD 的分區設置。除了默認的 defaultPartitioner,Spark 還提供了 RangePartitioner 和 HashPartitioner 外,此外用戶也可以自定義 partitioner。通過源碼可以發現如果是 HashPartitioner 的話,那么是會拋出一個錯誤的。
然后是 **mapSideCombine** 參數,這個參數正是 reduceByKey 和 groupByKey 最大不同的地方,它決定是是否會先在節點上進行一次 Combine 操作,下面會有更具體的例子來介紹。
#### 然后是 groupByKey
Step1
“`
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}
“`
Step2
“`
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn t use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) = CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) = buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) = c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
“`
Setp3
“`
def combineByKeyWithClassTag[C](
createCombiner: V = C,
mergeValue: (C, V) = C,
mergeCombiners: (C, C) = C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, mergeCombiners must be defined) // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException(Cannot use map-side combining with array keys.)
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException(HashPartitioner cannot partition array keys.)
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter = {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
“`
結合上面 reduceByKey 的調用鏈,可以發現最終其實都是調用 combineByKeyWithClassTag 這個方法的,但調用的參數不同。
reduceByKey 的調用
“`
combineByKeyWithClassTag[V]((v: V) = v, func, func, partitioner)
“`
groupByKey 的調用
“`
combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
“`
正是兩者不同的調用方式導致了兩個方法的差別,我們分別來看
– reduceByKey 的泛型參數直接是 [V],而 groupByKey 的泛型參數是 [CompactBuffer[V]]。這直接導致了 reduceByKey 和 groupByKey 的返回值不同,前者是 RDD[(K, V)],而后者是 RDD[(K, Iterable[V])]
– 然后就是 mapSideCombine = false 了,這個 mapSideCombine 參數的默認是 true 的。這個值有什么用呢,上面也說了,這個參數的作用是控制要不要在 map 端進行初步合并(Combine)。可以看看下面具體的例子。
img src= https://cache.yisu.com/upload/information/20210523/355/698556.png width= 65% /
img src= https://cache.yisu.com/upload/information/20210523/355/698557.png width= 65% /
從功能上來說,可以發現 ReduceByKey 其實就是會在每個節點先進行一次 ** 合并 ** 的操作,而 groupByKey 沒有。
這么來看 ReduceByKey 的性能會比 groupByKey 好很多,因為有些工作在節點已經處理了。
以上就是關于“Spark 中兩個類似的 api 是什么”這篇文章的內容,相信大家都有了一定的了解,希望丸趣 TV 小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注丸趣 TV 行業資訊頻道。