共計(jì) 4057 個(gè)字符,預(yù)計(jì)需要花費(fèi) 11 分鐘才能閱讀完成。
本篇內(nèi)容主要講解“Spark SQL 的 Join 實(shí)現(xiàn)方法有哪些”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓丸趣 TV 小編來帶大家學(xué)習(xí)“Spark SQL 的 Join 實(shí)現(xiàn)方法有哪些”吧!
SparkSQL 總體流程介紹
在闡述 Join 實(shí)現(xiàn)之前,我們首先簡(jiǎn)單介紹 SparkSQL 的總體流程,一般地,我們有兩種方式使用 SparkSQL,一種是直接寫 sql 語句,這個(gè)需要有元數(shù)據(jù)庫支持,例如 Hive 等,另一種是通過 Dataset/DataFrame 編寫 Spark 應(yīng)用程序。如下圖所示,sql 語句被語法解析 (SQL AST) 成查詢計(jì)劃,或者我們通過 Dataset/DataFrame 提供的 APIs 組織成查詢計(jì)劃,查詢計(jì)劃分為兩大類:邏輯計(jì)劃和物理計(jì)劃,這個(gè)階段通常叫做邏輯計(jì)劃,經(jīng)過語法分析 (Analyzer)、一系列查詢優(yōu)化(Optimizer) 后得到優(yōu)化后的邏輯計(jì)劃,最后被映射成物理計(jì)劃,轉(zhuǎn)換成 RDD 執(zhí)行。
Join 基本要素
如下圖所示,Join 大致包括三個(gè)要素:Join 方式、Join 條件以及過濾條件。其中過濾條件也可以通過 AND 語句放在 Join 條件中。
Spark 支持所有類型的 Join,包括:
inner join
left outer join
right outer join
full outer join
left semi join
left anti join
下面分別闡述這幾種 Join 的實(shí)現(xiàn)。
Join 基本實(shí)現(xiàn)流程
總體上來說,Join 的基本實(shí)現(xiàn)流程如下圖所示,Spark 將參與 Join 的兩張表抽象為流式遍歷表 (streamIter) 和查找表(buildIter),通常 streamIter 為大表,buildIter 為小表,我們不用擔(dān)心哪個(gè)表為 streamIter,哪個(gè)表為 buildIter,這個(gè) spark 會(huì)根據(jù) join 語句自動(dòng)幫我們完成。
在實(shí)際計(jì)算時(shí),spark 會(huì)基于 streamIter 來遍歷,每次取出 streamIter 中的一條記錄 rowA,根據(jù) Join 條件計(jì)算 keyA,然后根據(jù)該 keyA 去 buildIter 中查找所有滿足 Join 條件 (keyB==keyA) 的記錄 rowBs,并將 rowBs 中每條記錄分別與 rowAjoin 得到 join 后的記錄,最后根據(jù)過濾條件得到最終 join 的記錄。
從上述計(jì)算過程中不難發(fā)現(xiàn),對(duì)于每條來自 streamIter 的記錄,都要去 buildIter 中查找匹配的記錄,所以 buildIter 一定要是查找性能較優(yōu)的數(shù)據(jù)結(jié)構(gòu)。spark 提供了三種 join 實(shí)現(xiàn):sort merge join、broadcast join 以及 hash join。
sort merge join 實(shí)現(xiàn)
要讓兩條記錄能 join 到一起,首先需要將具有相同 key 的記錄在同一個(gè)分區(qū),所以通常來說,需要做一次 shuffle,map 階段根據(jù) join 條件確定每條記錄的 key,基于該 key 做 shuffle write,將可能 join 到一起的記錄分到同一個(gè)分區(qū)中,這樣在 shuffle read 階段就可以將兩個(gè)表中具有相同 key 的記錄拉到同一個(gè)分區(qū)處理。前面我們也提到,對(duì)于 buildIter 一定要是查找性能較優(yōu)的數(shù)據(jù)結(jié)構(gòu),通常我們能想到 hash 表,但是對(duì)于一張較大的表來說,不可能將所有記錄全部放到 hash 表中,另外也可以對(duì) buildIter 先排序,查找時(shí)按順序查找,查找代價(jià)也是可以接受的,我們知道,spark shuffle 階段天然就支持排序,這個(gè)是非常好實(shí)現(xiàn)的,下面是 sort merge join 示意圖。
在 shuffle read 階段,分別對(duì) streamIter 和 buildIter 進(jìn)行 merge sort,在遍歷 streamIter 時(shí),對(duì)于每條記錄,都采用順序查找的方式從 buildIter 查找對(duì)應(yīng)的記錄,由于兩個(gè)表都是排序的,每次處理完 streamIter 的一條記錄后,對(duì)于 streamIter 的下一條記錄,只需從 buildIter 中上一次查找結(jié)束的位置開始查找,所以說每次在 buildIter 中查找不必重頭開始,整體上來說,查找性能還是較優(yōu)的。
broadcast join 實(shí)現(xiàn)
為了能具有相同 key 的記錄分到同一個(gè)分區(qū),我們通常是做 shuffle,那么如果 buildIter 是一個(gè)非常小的表,那么其實(shí)就沒有必要大動(dòng)干戈做 shuffle 了,直接將 buildIter 廣播到每個(gè)計(jì)算節(jié)點(diǎn),然后將 buildIter 放到 hash 表中,如下圖所示。
從上圖可以看到,不用做 shuffle,可以直接在一個(gè) map 中完成,通常這種 join 也稱之為 map join。那么問題來了,什么時(shí)候會(huì)用 broadcast join 實(shí)現(xiàn)呢?這個(gè)不用我們擔(dān)心,spark sql 自動(dòng)幫我們完成,當(dāng) buildIter 的估計(jì)大小不超過參數(shù) spark.sql.autoBroadcastJoinThreshold 設(shè)定的值(默認(rèn) 10M),那么就會(huì)自動(dòng)采用 broadcast join,否則采用 sort merge join。
hash join 實(shí)現(xiàn)
除了上面兩種 join 實(shí)現(xiàn)方式外,spark 還提供了 hash join 實(shí)現(xiàn)方式,在 shuffle read 階段不對(duì)記錄排序,反正來自兩格表的具有相同 key 的記錄會(huì)在同一個(gè)分區(qū),只是在分區(qū)內(nèi)不排序,將來自 buildIter 的記錄放到 hash 表中,以便查找,如下圖所示。
不難發(fā)現(xiàn),要將來自 buildIter 的記錄放到 hash 表中,那么每個(gè)分區(qū)來自 buildIter 的記錄不能太大,否則就存不下,默認(rèn)情況下 hash join 的實(shí)現(xiàn)是關(guān)閉狀態(tài),如果要使用 hash join,必須滿足以下四個(gè)條件:
buildIter 總體估計(jì)大小超過 spark.sql.autoBroadcastJoinThreshold 設(shè)定的值,即不滿足 broadcast join 條件
開啟嘗試使用 hash join 的開關(guān),spark.sql.join.preferSortMergeJoin=false
每個(gè)分區(qū)的平均大小不超過 spark.sql.autoBroadcastJoinThreshold 設(shè)定的值,即 shuffle read 階段每個(gè)分區(qū)來自 buildIter 的記錄要能放到內(nèi)存中
streamIter 的大小是 buildIter 三倍以上
所以說,使用 hash join 的條件其實(shí)是很苛刻的,在大多數(shù)實(shí)際場(chǎng)景中,即使能使用 hash join,但是使用 sort merge join 也不會(huì)比 hash join 差很多,所以盡量使用 hash
下面我們分別闡述不同 Join 方式的實(shí)現(xiàn)流程。
inner join
inner join 是一定要找到左右表中滿足 join 條件的記錄,我們?cè)趯?sql 語句或者使用 DataFrame 時(shí),可以不用關(guān)心哪個(gè)是左表,哪個(gè)是右表,在 spark sql 查詢優(yōu)化階段,spark 會(huì)自動(dòng)將大表設(shè)為左表,即 streamIter,將小表設(shè)為右表,即 buildIter。這樣對(duì)小表的查找相對(duì)更優(yōu)。其基本實(shí)現(xiàn)流程如下圖所示,在查找階段,如果右表不存在滿足 join 條件的記錄,則跳過。
left outer join
left outer join 是以左表為準(zhǔn),在右表中查找匹配的記錄,如果查找失敗,則返回一個(gè)所有字段都為 null 的記錄。我們?cè)趯?sql 語句或者使用 DataFrmae 時(shí),一般讓大表在左邊,小表在右邊。其基本實(shí)現(xiàn)流程如下圖所示。
right outer join
right outer join 是以右表為準(zhǔn),在左表中查找匹配的記錄,如果查找失敗,則返回一個(gè)所有字段都為 null 的記錄。所以說,右表是 streamIter,左表是 buildIter,我們?cè)趯?sql 語句或者使用 DataFrame 時(shí),一般讓大表在右邊,小表在左邊。其基本實(shí)現(xiàn)流程如下圖所示。
full outer join
full outer join 相對(duì)來說要復(fù)雜一點(diǎn),總體上來看既要做 left outer join,又要做 right outer join,但是又不能簡(jiǎn)單地先 left outer join,再 right outer join,最后 union 得到最終結(jié)果,因?yàn)檫@樣最終結(jié)果中就存在兩份 inner join 的結(jié)果了。因?yàn)榧热煌瓿?left outer join 又要完成 right outer join,所以 full outer join 僅采用 sort merge join 實(shí)現(xiàn),左邊和右表既要作為 streamIter,又要作為 buildIter,其基本實(shí)現(xiàn)流程如下圖所示。
由于左表和右表已經(jīng)排好序,首先分別順序取出左表和右表中的一條記錄,比較 key,如果 key 相等,則 joinrowA 和 rowB,并將 rowA 和 rowB 分別更新到左表和右表的下一條記錄;如果 keyA keyB,則說明右表中沒有與左表 rowA 對(duì)應(yīng)的記錄,那么 joinrowA 與 nullRow,緊接著,rowA 更新到左表的下一條記錄;如果 keyA keyB,則說明左表中沒有與右表 rowB 對(duì)應(yīng)的記錄,那么 joinnullRow 與 rowB,緊接著,rowB 更新到右表的下一條記錄。如此循環(huán)遍歷直到左表和右表的記錄全部處理完。
left semi join
left semi join 是以左表為準(zhǔn),在右表中查找匹配的記錄,如果查找成功,則僅返回左邊的記錄,否則返回 null,其基本實(shí)現(xiàn)流程如下圖所示。
left anti join
left anti join 與 left semi join 相反,是以左表為準(zhǔn),在右表中查找匹配的記錄,如果查找成功,則返回 null,否則僅返回左邊的記錄,其基本實(shí)現(xiàn)流程如下圖所示。
總結(jié)
Join 是數(shù)據(jù)庫查詢中一個(gè)非常重要的語法特性,在數(shù)據(jù)庫領(lǐng)域可以說是“得 join 者得天下”,SparkSQL 作為一種分布式數(shù)據(jù)倉庫系統(tǒng),給我們提供了全面的 join 支持,并在內(nèi)部實(shí)現(xiàn)上無聲無息地做了很多優(yōu)化,了解 join 的實(shí)現(xiàn)將有助于我們更深刻的了解我們的應(yīng)用程序的運(yùn)行軌跡。
到此,相信大家對(duì)“Spark SQL 的 Join 實(shí)現(xiàn)方法有哪些”有了更深的了解,不妨來實(shí)際操作一番吧!這里是丸趣 TV 網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!