久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

DB數(shù)據(jù)同步到數(shù)據(jù)倉庫的示例分析

138次閱讀
沒有評論

共計(jì) 5176 個字符,預(yù)計(jì)需要花費(fèi) 13 分鐘才能閱讀完成。

這篇文章給大家分享的是有關(guān) DB 數(shù)據(jù)同步到數(shù)據(jù)倉庫的示例分析的內(nèi)容。丸趣 TV 小編覺得挺實(shí)用的,因此分享給大家做個參考,一起跟隨丸趣 TV 小編過來看看吧。

背景

在數(shù)據(jù)倉庫建模中,未經(jīng)任何加工處理的原始業(yè)務(wù)層數(shù)據(jù),我們稱之為 ODS(Operational Data Store) 數(shù)據(jù)。在互聯(lián)網(wǎng)企業(yè)中,常見的 ODS 數(shù)據(jù)有業(yè)務(wù)日志數(shù)據(jù)(Log)和業(yè)務(wù) DB 數(shù)據(jù)(DB)兩類。對于業(yè)務(wù) DB 數(shù)據(jù)來說,從 MySQL 等關(guān)系型數(shù)據(jù)庫的業(yè)務(wù)數(shù)據(jù)進(jìn)行采集,然后導(dǎo)入到 Hive 中,是進(jìn)行數(shù)據(jù)倉庫生產(chǎn)的重要環(huán)節(jié)。

如何準(zhǔn)確、高效地把 MySQL 數(shù)據(jù)同步到 Hive 中?一般常用的解決方案是批量取數(shù)并 Load:直連 MySQL 去 Select 表中的數(shù)據(jù),然后存到本地文件作為中間存儲,最后把文件 Load 到 Hive 表中。這種方案的優(yōu)點(diǎn)是實(shí)現(xiàn)簡單,但是隨著業(yè)務(wù)的發(fā)展,缺點(diǎn)也逐漸暴露出來:

性能瓶頸:隨著業(yè)務(wù)規(guī)模的增長,Select From MySQL – Save to Localfile – Load to Hive 這種數(shù)據(jù)流花費(fèi)的時間越來越長,無法滿足下游數(shù)倉生產(chǎn)的時間要求。

直接從 MySQL 中 Select 大量數(shù)據(jù),對 MySQL 的影響非常大,容易造成慢查詢,影響業(yè)務(wù)線上的正常服務(wù)。

由于 Hive 本身的語法不支持更新、刪除等 SQL 原語,對于 MySQL 中發(fā)生 Update/Delete 的數(shù)據(jù)無法很好地進(jìn)行支持。

為了徹底解決這些問題,我們逐步轉(zhuǎn)向 CDC (Change Data Capture) + Merge 的技術(shù)方案,即實(shí)時 Binlog 采集 + 離線處理 Binlog 還原業(yè)務(wù)數(shù)據(jù)這樣一套解決方案。Binlog 是 MySQL 的二進(jìn)制日志,記錄了 MySQL 中發(fā)生的所有數(shù)據(jù)變更,MySQL 集群自身的主從同步就是基于 Binlog 做的。

本文主要從 Binlog 實(shí)時采集和離線處理 Binlog 還原業(yè)務(wù)數(shù)據(jù)兩個方面,來介紹如何實(shí)現(xiàn) DB 數(shù)據(jù)準(zhǔn)確、高效地進(jìn)入數(shù)倉。

整體架構(gòu)

整體的架構(gòu)如上圖所示。在 Binlog 實(shí)時采集方面,我們采用了阿里巴巴的開源項(xiàng)目 Canal,負(fù)責(zé)從 MySQL 實(shí)時拉取 Binlog 并完成適當(dāng)解析。Binlog 采集后會暫存到 Kafka 上供下游消費(fèi)。整體實(shí)時采集部分如圖中紅色箭頭所示。

離線處理 Binlog 的部分,如圖中黑色箭頭所示,通過下面的步驟在 Hive 上還原一張 MySQL 表:

采用 Linkedin 的開源項(xiàng)目 Camus,負(fù)責(zé)每小時把 Kafka 上的 Binlog 數(shù)據(jù)拉取到 Hive 上。

對每張 ODS 表,首先需要一次性制作快照(Snapshot),把 MySQL 里的存量數(shù)據(jù)讀取到 Hive 上,這一過程底層采用直連 MySQL 去 Select 數(shù)據(jù)的方式。

對每張 ODS 表,每天基于存量數(shù)據(jù)和當(dāng)天增量產(chǎn)生的 Binlog 做 Merge,從而還原出業(yè)務(wù)數(shù)據(jù)。

我們回過頭來看看,背景中介紹的批量取數(shù)并 Load 方案遇到的各種問題,為什么用這種方案能解決上面的問題呢?

首先,Binlog 是流式產(chǎn)生的,通過對 Binlog 的實(shí)時采集,把部分?jǐn)?shù)據(jù)處理需求由每天一次的批處理分?jǐn)偟綄?shí)時流上。無論從性能上還是對 MySQL 的訪問壓力上,都會有明顯地改善。

第二,Binlog 本身記錄了數(shù)據(jù)變更的類型(Insert/Update/Delete),通過一些語義方面的處理,完全能夠做到精準(zhǔn)的數(shù)據(jù)還原。

Binlog 實(shí)時采集

對 Binlog 的實(shí)時采集包含兩個主要模塊:一是 CanalManager,主要負(fù)責(zé)采集任務(wù)的分配、監(jiān)控報(bào)警、元數(shù)據(jù)管理以及和外部依賴系統(tǒng)的對接;二是真正執(zhí)行采集任務(wù)的 Canal 和 CanalClient。

當(dāng)用戶提交某個 DB 的 Binlog 采集請求時,CanalManager 首先會調(diào)用 DBA 平臺的相關(guān)接口,獲取這一 DB 所在 MySQL 實(shí)例的相關(guān)信息,目的是從中選出最適合 Binlog 采集的機(jī)器。然后把采集實(shí)例(Canal Instance)分發(fā)到合適的 Canal 服務(wù)器上,即 CanalServer 上。在選擇具體的 CanalServer 時,CanalManager 會考慮負(fù)載均衡、跨機(jī)房傳輸?shù)纫蛩兀瑑?yōu)先選擇負(fù)載較低且同地域傳輸?shù)臋C(jī)器。

CanalServer 收到采集請求后,會在 ZooKeeper 上對收集信息進(jìn)行注冊。注冊的內(nèi)容包括:

以 Instance 名稱命名的永久節(jié)點(diǎn)。

在該永久節(jié)點(diǎn)下注冊以自身 ip:port 命名的臨時節(jié)點(diǎn)。

這樣做的目的有兩個:

高可用:CanalManager 對 Instance 進(jìn)行分發(fā)時,會選擇兩臺 CanalServer,一臺是 Running 節(jié)點(diǎn),另一臺作為 Standby 節(jié)點(diǎn)。Standby 節(jié)點(diǎn)會對該 Instance 進(jìn)行監(jiān)聽,當(dāng) Running 節(jié)點(diǎn)出現(xiàn)故障后,臨時節(jié)點(diǎn)消失,然后 Standby 節(jié)點(diǎn)進(jìn)行搶占。這樣就達(dá)到了容災(zāi)的目的。

與 CanalClient 交互:CanalClient 檢測到自己負(fù)責(zé)的 Instance 所在的 Running CanalServer 后,便會進(jìn)行連接,從而接收到 CanalServer 發(fā)來的 Binlog 數(shù)據(jù)。

對 Binlog 的訂閱以 MySQL 的 DB 為粒度,一個 DB 的 Binlog 對應(yīng)了一個 Kafka Topic。底層實(shí)現(xiàn)時,一個 MySQL 實(shí)例下所有訂閱的 DB,都由同一個 Canal Instance 進(jìn)行處理。這是因?yàn)?Binlog 的產(chǎn)生是以 MySQL 實(shí)例為粒度的。CanalServer 會拋棄掉未訂閱的 Binlog 數(shù)據(jù),然后 CanalClient 將接收到的 Binlog 按 DB 粒度分發(fā)到 Kafka 上。

離線還原 MySQL 數(shù)據(jù)

完成 Binlog 采集后,下一步就是利用 Binlog 來還原業(yè)務(wù)數(shù)據(jù)。首先要解決的第一個問題是把 Binlog 從 Kafka 同步到 Hive 上。

Kafka2Hive

整個 Kafka2Hive 任務(wù)的管理,在美團(tuán)數(shù)據(jù)平臺的 ETL 框架下進(jìn)行,包括任務(wù)原語的表達(dá)和調(diào)度機(jī)制等,都同其他 ETL 類似。而底層采用 LinkedIn 的開源項(xiàng)目 Camus,并進(jìn)行了有針對性的二次開發(fā),來完成真正的 Kafka2Hive 數(shù)據(jù)傳輸工作。

對 Camus 的二次開發(fā)

Kafka 上存儲的 Binlog 未帶 Schema,而 Hive 表必須有 Schema,并且其分區(qū)、字段等的設(shè)計(jì),都要便于下游的高效消費(fèi)。對 Camus 做的第一個改造,便是將 Kafka 上的 Binlog 解析成符合目標(biāo) Schema 的格式。

對 Camus 做的第二個改造,由美團(tuán)的 ETL 框架所決定。在我們的任務(wù)調(diào)度系統(tǒng)中,目前只對同調(diào)度隊(duì)列的任務(wù)做上下游依賴關(guān)系的解析,跨調(diào)度隊(duì)列是不能建立依賴關(guān)系的。而在 MySQL2Hive 的整個流程中,Kafka2Hive 的任務(wù)需要每小時執(zhí)行一次(小時隊(duì)列),Merge 任務(wù)每天執(zhí)行一次(天隊(duì)列)。而 Merge 任務(wù)的啟動必須要嚴(yán)格依賴小時 Kafka2Hive 任務(wù)的完成。

為了解決這一問題,我們引入了 Checkdone 任務(wù)。Checkdone 任務(wù)是天任務(wù),主要負(fù)責(zé)檢測前一天的 Kafka2Hive 是否成功完成。如果成功完成了,則 Checkdone 任務(wù)執(zhí)行成功,這樣下游的 Merge 任務(wù)就可以正確啟動了。

Checkdone 的檢測邏輯

Checkdone 是怎樣檢測的呢?每個 Kafka2Hive 任務(wù)成功完成數(shù)據(jù)傳輸后,由 Camus 負(fù)責(zé)在相應(yīng)的 HDFS 目錄下記錄該任務(wù)的啟動時間。Checkdone 會掃描前一天的所有時間戳,如果最大的時間戳已經(jīng)超過了 0 點(diǎn),就說明前一天的 Kafka2Hive 任務(wù)都成功完成了,這樣 Checkdone 就完成了檢測。

此外,由于 Camus 本身只是完成了讀 Kafka 然后寫 HDFS 文件的過程,還必須完成對 Hive 分區(qū)的加載才能使下游查詢到。因此,整個 Kafka2Hive 任務(wù)的最后一步是加載 Hive 分區(qū)。這樣,整個任務(wù)才算成功執(zhí)行。

每個 Kafka2Hive 任務(wù)負(fù)責(zé)讀取一個特定的 Topic,把 Binlog 數(shù)據(jù)寫入 original_binlog 庫下的一張表中,即前面圖中的 original_binlog.db,其中存儲的是對應(yīng)到一個 MySQL DB 的全部 Binlog。

上圖說明了一個 Kafka2Hive 完成后,文件在 HDFS 上的目錄結(jié)構(gòu)。假如一個 MySQL DB 叫做 user,對應(yīng)的 Binlog 存儲在 original_binlog.user 表中。ready 目錄中,按天存儲了當(dāng)天所有成功執(zhí)行的 Kafka2Hive 任務(wù)的啟動時間,供 Checkdone 使用。每張表的 Binlog,被組織到一個分區(qū)中,例如 userinfo 表的 Binlog,存儲在 table_name=userinfo 這一分區(qū)中。每個 table_name 一級分區(qū)下,按 dt 組織二級分區(qū)。圖中的 xxx.lzo 和 xxx.lzo.index 文件,存儲的是經(jīng)過 lzo 壓縮的 Binlog 數(shù)據(jù)。

Merge

Binlog 成功入倉后,下一步要做的就是基于 Binlog 對 MySQL 數(shù)據(jù)進(jìn)行還原。Merge 流程做了兩件事,首先把當(dāng)天生成的 Binlog 數(shù)據(jù)存放到 Delta 表中,然后和已有的存量數(shù)據(jù)做一個基于主鍵的 Merge。Delta 表中的數(shù)據(jù)是當(dāng)天的最新數(shù)據(jù),當(dāng)一條數(shù)據(jù)在一天內(nèi)發(fā)生多次變更時,Delta 表中只存儲最后一次變更后的數(shù)據(jù)。

把 Delta 數(shù)據(jù)和存量數(shù)據(jù)進(jìn)行 Merge 的過程中,需要有唯一鍵來判定是否是同一條數(shù)據(jù)。如果同一條數(shù)據(jù)既出現(xiàn)在存量表中,又出現(xiàn)在 Delta 表中,說明這一條數(shù)據(jù)發(fā)生了更新,則選取 Delta 表的數(shù)據(jù)作為最終結(jié)果;否則說明沒有發(fā)生任何變動,保留原來存量表中的數(shù)據(jù)作為最終結(jié)果。Merge 的結(jié)果數(shù)據(jù)會 Insert Overwrite 到原表中,即圖中的 origindb.table。

Merge 流程舉例

下面用一個例子來具體說明 Merge 的流程。

數(shù)據(jù)表共 id、value 兩列,其中 id 是主鍵。在提取 Delta 數(shù)據(jù)時,對同一條數(shù)據(jù)的多次更新,只選擇最后更新的一條。所以對 id= 1 的數(shù)據(jù),Delta 表中記錄最后一條更新后的值 value=120。Delta 數(shù)據(jù)和存量數(shù)據(jù)做 Merge 后,最終結(jié)果中,新插入一條數(shù)據(jù)(id=4),兩條數(shù)據(jù)發(fā)生了更新(id= 1 和 id=2),一條數(shù)據(jù)未變(id=3)。

默認(rèn)情況下,我們采用 MySQL 表的主鍵作為這一判重的唯一鍵,業(yè)務(wù)也可以根據(jù)實(shí)際情況配置不同于 MySQL 的唯一鍵。

上面介紹了基于 Binlog 的數(shù)據(jù)采集和 ODS 數(shù)據(jù)還原的整體架構(gòu)。下面主要從兩個方面介紹我們解決的實(shí)際業(yè)務(wù)問題。

實(shí)踐一:分庫分表的支持

隨著業(yè)務(wù)規(guī)模的擴(kuò)大,MySQL 的分庫分表情況越來越多,很多業(yè)務(wù)的分表數(shù)目都在幾千個這樣的量級。而一般數(shù)據(jù)開發(fā)同學(xué)需要把這些數(shù)據(jù)聚合到一起進(jìn)行分析。如果對每個分表都進(jìn)行手動同步,再在 Hive 上進(jìn)行聚合,這個成本很難被我們接受。因此,我們需要在 ODS 層就完成分表的聚合。

首先,在 Binlog 實(shí)時采集時,我們支持把不同 DB 的 Binlog 寫入到同一個 Kafka Topic。用戶可以在申請 Binlog 采集時,同時勾選同一個業(yè)務(wù)邏輯下的多個物理 DB。通過在 Binlog 采集層的匯集,所有分庫的 Binlog 會寫入到同一張 Hive 表中,這樣下游在進(jìn)行 Merge 時,依然只需要讀取一張 Hive 表。

第二,Merge 任務(wù)的配置支持正則匹配。通過配置符合業(yè)務(wù)分表命名規(guī)則的正則表達(dá)式,Merge 任務(wù)就能了解自己需要聚合哪些 MySQL 表的 Binlog,從而選取相應(yīng)分區(qū)的數(shù)據(jù)來執(zhí)行。

這樣通過兩個層面的工作,就完成了分庫分表在 ODS 層的合并。

這里面有一個技術(shù)上的優(yōu)化,在進(jìn)行 Kafka2Hive 時,我們按業(yè)務(wù)分表規(guī)則對表名進(jìn)行了處理,把物理表名轉(zhuǎn)換成了邏輯表名。例如 userinfo123 這張表名會被轉(zhuǎn)換為 userinfo,其 Binlog 數(shù)據(jù)存儲在 original_binlog.user 表的 table_name=userinfo 分區(qū)中。這樣做的目的是防止過多的 HDFS 小文件和 Hive 分區(qū)造成的底層壓力。

實(shí)踐二:刪除事件的支持

Delete 操作在 MySQL 中非常常見,由于 Hive 不支持 Delete,如果想把 MySQL 中刪除的數(shù)據(jù)在 Hive 中刪掉,需要采用“迂回”的方式進(jìn)行。

對需要處理 Delete 事件的 Merge 流程,采用如下兩個步驟:

首先,提取出發(fā)生了 Delete 事件的數(shù)據(jù),由于 Binlog 本身記錄了事件類型,這一步很容易做到。將存量數(shù)據(jù)(表 A)與被刪掉的數(shù)據(jù)(表 B)在主鍵上做左外連接 (Left outer join),如果能夠全部 join 到雙方的數(shù)據(jù),說明該條數(shù)據(jù)被刪掉了。因此,選擇結(jié)果中表 B 對應(yīng)的記錄為 NULL 的數(shù)據(jù),即是應(yīng)當(dāng)被保留的數(shù)據(jù)。

然后,對上面得到的被保留下來的數(shù)據(jù),按照前面描述的流程做常規(guī)的 Merge。

感謝各位的閱讀!關(guān)于“DB 數(shù)據(jù)同步到數(shù)據(jù)倉庫的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-04發(fā)表,共計(jì)5176字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 卢湾区| 恭城| 孟连| 山西省| 阿坝| 衡阳市| 安康市| 绥芬河市| 泰宁县| 子长县| 广河县| 东兰县| 云安县| 清原| 南乐县| 高台县| 胶州市| 中西区| 龙泉市| 宜州市| 金川县| 和顺县| 安吉县| 平远县| 凤山县| 德格县| 社会| 桦川县| 永康市| 仪征市| 长治市| 瑞安市| 唐海县| 南皮县| 长白| 许昌县| 彰武县| 多伦县| 亚东县| 上杭县| 南投县|