共計 2387 個字符,預計需要花費 6 分鐘才能閱讀完成。
Delta Lake 如何實現 CDC 實時入湖,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
什么是 CDC
Change Data Capture(CDC)用來跟蹤捕獲數據源的數據變化,并將這些變化同步到目標存儲 (如數據湖或數據倉庫),用于數據備份或后續分析,同步過程可以是分鐘 / 小時 / 天等粒度,也可以是實時同步。CDC 方案分為侵入式(intrusive manner) 和非傾入性 (non-intrusive manner) 兩種。
侵入式
侵入式方案直接請求數據源系統(如通過 JDBC 讀取數據),會給數據源系統帶來性能壓力。常見的方案如下:
最后更新時間(Last Modified)
源表需要有修改時間列,同步作業需要指定最后修改時間參數,表明同步某個時間點之后變更的數據。該方法不能同步刪除記錄的變更,同一條記錄多次變更只能記錄最后一次。
自增 id 列
源表需要有一個自增 id 列,同步作業需要指定上次同步的最大 id 值,同步上次之后新增的記錄行。該方法也不能同步刪除記錄的變更,而且老記錄的變更也無法感知。
非侵入式
非侵入性一般通過日志的方式記錄數據源的數據變化(如數據庫的 binlog),源庫需要開啟 binlog 的功能。數據源的每次操作都會被記錄到 binlog 中(如 insert/update/delete 等),能夠實時跟蹤數據插入 / 刪除 / 數據多次更新 /DDL 操作等。
示例:
insert into table testdb.test values(hangzhou ,1);update testdb.test set b=2 where a= hangzhou update testdb.test set b=3 where a= hangzhou delete from testdb.test where a= hangzhou
通過將 binlog 日志有序的回放到目標存儲中,從而實現對數據源的數據導出同步功能。
常見的 CDC 方案實現
開源常見的 CDC 方案實現主要有兩種:
Sqoop 離線同步
sqoop 是一個開源的數據同步工具,它可以將數據庫的數據同步到 HDFS/Hive 中,支持全量同步和增量同步,用戶可以配置小時 / 天的調度作業來定時同步數據。
sqoop 增量同步是一種侵入式的 CDC 方案,支持 Last Modified 和 Append 模式。
缺點:
直接 jdbc 請求源庫拉取數據,影響源庫性能
小時 / 天調度,實時性不高
無法同步源庫的刪除操作,Append 模式還不支持數據更新操作
binlog 實時同步
binlog 日志可以通過一些工具實時同步到 kafka 等消息中間件中,然后通過 Spark/Flink 等流引擎實時的回放 binlog 到目標存儲(如 Kudu/HBase 等)。
缺點:
Kudu/HBase 運維成本高
Kudu 在數據量大的有穩定性問題, HBase 不支持高吞吐的分析
Spark Streaming 實現回放 binlog 邏輯復雜,使用 java/scala 代碼具有一定門檻
Streaming SQL+Delta Lake 實時入湖方案
前面介紹了兩種常見的 CDC 方案,各自都有一些缺點。阿里云 E -MapReduce 團隊提供了一種新的 CDC 解決方案,利用自研的 Streaming SQL 搭配 Delta Lake 可以輕松實現 CDC 實時入湖。這套解決方案同時通過阿里云最新發布的數據湖構建(Data Lake Formation,DLF)服務提供一站式的入湖體驗。
Streaming SQL
Spark Streaming SQL 在 Spark Structured Streaming 之上提供了 SQL 能力,降低了實時業務開發的門檻,使得離線業務實時化更簡單方便。
下面以實時消費 SLS 為例:
# 創建 loghub 源表
spark-sql CREATE TABLE loghub_intput_tbl(content string)
USING loghub
OPTIONS
(...)
# 創建 delta 目標表
spark-sql CREATE TABLE delta_output_tbl(content string)
USING delta
OPTIONS
(...);
# 創建流式 SCAN
spark-sql CREATE SCAN loghub_table_intput_test_stream
ON loghub_intput_tbl
USING STREAM;
# 將 loghub 源表數據插入 delta 目標表
spark-sql INSERT INTO delta_output_tbl SELECT content FROM loghub_table_intput_test_stream;
Delta Lake
Delta Lake 是 Databricks 開源的一種數據湖格式,它在 parquet 格式之上,提供了 ACID 事務 / 元數據管理等能力,同時相比 parquet 具有更好的性能,能夠支持更豐富的數據應用場景(如數據更新 /schema 演化等)。
E-MapReduce 團隊在開源 Delta Lake 基礎上做了很多功能和性能的優化,如小文件合并 Optimize/DataSkipping/Zorder,SparkSQL/Streaming SQL/Hive/Presto 深度集成 Delta 等。
Streaming SQL+Delta Lake CDC 實時入湖
Spark Streaming SQL 提供了 Merge Into 的語法,搭配 Delta Lake 的實時寫入能力,可以很方便的實現 CDC 實時入湖方案。
如上圖所示,只需要 SQL 就能完成 CDC 實時入湖。
看完上述內容,你們掌握 Delta Lake 如何實現 CDC 實時入湖的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注丸趣 TV 行業資訊頻道,感謝各位的閱讀!