久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Databricks如何使用Spark Streaming和Delta Lake對流式數(shù)據(jù)進行數(shù)據(jù)質(zhì)量監(jiān)控

158次閱讀
沒有評論

共計 3157 個字符,預計需要花費 8 分鐘才能閱讀完成。

行業(yè)資訊    
服務器    
云計算    
Databricks 如何使用 Spark Streaming 和 Delta Lake 對流式數(shù)據(jù)進行數(shù)據(jù)質(zhì)量監(jiān)控

本篇文章給大家分享的是有關 Databricks 如何使用 Spark Streaming 和 Delta Lake 對流式數(shù)據(jù)進行數(shù)據(jù)質(zhì)量監(jiān)控,丸趣 TV 小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著丸趣 TV 小編一起來看看吧。

丸趣 TV 小編主要對 Databricks 如何使用 Spark Streaming 和 Delta Lake 對流式數(shù)據(jù)進行數(shù)據(jù)質(zhì)量監(jiān)控的方法和架構進行了介紹,下面探討了一種數(shù)據(jù)管理架構,該架構可以在數(shù)據(jù)到達時,通過主動監(jiān)控和分析來檢測流式數(shù)據(jù)中損壞或不良的數(shù)據(jù),并且不會造成瓶頸。

構建流式數(shù)據(jù)分析和監(jiān)控流程

在 Databricks,我們看到客戶中不斷涌現(xiàn)出許多數(shù)據(jù)處理模式,這些新模式的產(chǎn)生推動了可能的極限,在速度和質(zhì)量問題上也不例外。為了幫助解決這一矛盾,我們開始考慮使用正確的工具,不僅可以支持所需的數(shù)據(jù)速度,還可以提供可接受的數(shù)據(jù)質(zhì)量水平。Structured Streaming 和 Delta Lake 非常適合用于數(shù)據(jù)獲取和存儲層,因為他們能夠配合創(chuàng)造一個具有擴展性、容錯性和類實時的系統(tǒng),并且具有 exactly-once 處理保證。

為企業(yè)數(shù)據(jù)質(zhì)量分析找到可接受的工具要困難一些,特別是這個工具需要具有對數(shù)據(jù)質(zhì)量指標的狀態(tài)匯總的能力。另外,還需要能夠?qū)φ麄€數(shù)據(jù)集進行檢查(例如檢測出多少比例的記錄為空值),這些都會隨著所提取的數(shù)據(jù)量的增加而增加計算成本。這對所有流式系統(tǒng)而言都是需要的,這一要求就排除了很多可用的工具。

在我們最初的解決方案中,我們選擇了 Amazon 的數(shù)據(jù)質(zhì)量檢測工具 Deequ,因為它能提供簡單而強大的 API,有對數(shù)據(jù)質(zhì)量指標進行狀態(tài)聚合的能力,以及對 Scala 的支持。將來,其他 Spark 原生的工具將提供額外的選擇。

流式數(shù)據(jù)質(zhì)量監(jiān)控的實現(xiàn)

我們通過在 EC2 實例上運行一個小型的 Kafka producer 來模擬數(shù)據(jù)流,該實例將模擬的股票交易信息寫入 Kafka topic,并使用原生的 Databricks 連接器將這些數(shù)據(jù)導入到 Delta Lake 表當中。為了展示 Spark Streaming 中數(shù)據(jù)質(zhì)量檢查的功能,我們選擇在整個流程中實現(xiàn) Deequ 的不同功能:

根據(jù)歷史數(shù)據(jù)生成約束條件;

使用 foreachBatch 算子對到達的數(shù)據(jù)進行增量質(zhì)量分析;

使用 foreachBatch 算子對到達的數(shù)據(jù)執(zhí)行(較小的)單元測試,并將質(zhì)量不佳的 batch 隔離到質(zhì)量不佳記錄表中;

對于每個到達的 batch,將最新的狀態(tài)指標寫入到 Delta 表當中;

對整個數(shù)據(jù)集定期執(zhí)行(較大的)單元測試,并在 MLFlow 中跟蹤結果;

根據(jù)驗證結果發(fā)送通知(如通過電子郵件或 Slack);

捕獲 MLFlow 中的指標以進行可視化和記錄。

我們結合了 MLFlow 來跟蹤一段時間內(nèi)數(shù)據(jù)性能指標的質(zhì)量、Delta 表的版本迭代以及結合了一個用于通知和告警的 Slack 連接器。整個流程可以用如下的圖片進行表示:

由于 Spark 中具有統(tǒng)一的批處理 / 流式處理接口,因此我們能夠在這個流程的任何位置提取報告、告警和指標,作為實時更新或批處理快照。這對于設置觸發(fā)器或限制特別有用,因此,如果某個指標超過了閾值,則可以執(zhí)行數(shù)據(jù)質(zhì)量改善措施。還要注意的是,我們并沒有對初始到達的原始數(shù)據(jù)造成影響,這些數(shù)據(jù)將立即提交到我們的 Delta 表,這意味著我們不會限制數(shù)據(jù)輸入的速率。下游系統(tǒng)可以直接從該表中讀取數(shù)據(jù),如果超過了上述任何觸發(fā)條件或質(zhì)量閾值,則可能會中斷。此外,我們可以輕松地創(chuàng)建一個排除質(zhì)量不佳記錄的 view 以提供一個干凈的表。

在一個較高的層次,執(zhí)行我們的數(shù)據(jù)質(zhì)量跟蹤和驗證的代碼如下所示:

spark.readStream.table(trades_delta).writeStream.foreachBatch {(batchDF: DataFrame, batchId: Long) = 
   // reassign our current state to the previous next state    val stateStoreCurr = stateStoreNext
   // run analysis on the current batch, aggregate with saved state    val metricsResult = AnalysisRunner.run(data=batchDF, ...)        // verify the validity of our current microbatch    val verificationResult = VerificationSuite()        .onData(batchDF)        .addCheck(...).run()
   // if verification fails, write batch to bad records table    if (verificationResult.status != CheckStatus.Success) {...}
   // write the current results into the metrics table    Metric_results.write    .format(delta)    .mode(overwrite)    .saveAsTable(deequ_metrics)}.start()

使用數(shù)據(jù)質(zhì)量工具 Deequ

在 Databricks 中使用 Deequ 是相對比較容易的事情,你需要首先定義一個 analyzer,然后在 dataframe 上運行該 analyzer。例如,我們可以跟蹤 Deequ 本地提供的幾個相關指標檢查,包括檢查數(shù)量和價格是否為非負數(shù)、原始 IP 地址是否不為空以及符號字段在所有事務中的唯一性。Deequ 的 StateProvider 對象在流式數(shù)據(jù)配置中特別有用,它能允許用戶將我們指標的狀態(tài)保存在內(nèi)存或磁盤中,并在以后匯總這些指標。這意味著每個處理的批次僅分析該批次中的數(shù)據(jù)記錄,而不會分析整個表。即使隨著數(shù)據(jù)大小的增長,這也可以使性能保持相對穩(wěn)定,這在長時間運行的生產(chǎn)環(huán)境中很重要,因為生產(chǎn)環(huán)境需要在任意數(shù)量的數(shù)據(jù)上保持一致。

MLFlow 還可以很好地跟蹤指標隨時間的演變,在我們的 notebook 中,我們跟蹤在 foreachBatch 代碼中分析的所有 Deequ 約束作為指標,并使用 Delta 的 versionID 和時間戳作為參數(shù)。在 Databricks 的 notebook 中,集成的 MLFlow 服務對于指標跟蹤特別方便。

通過使用 Structured Streaming、Delta Lake 和 Deequ,我們能夠消除傳統(tǒng)情況下數(shù)據(jù)質(zhì)量和速度之間的權衡,而專注于實現(xiàn)兩者的可接受水平。這里特別重要的是靈活性——不僅在如何處理不良記錄(隔離、報錯、告警等),而且在體系結構上(例如何時以及在何處執(zhí)行檢查?)和生態(tài)上(如何使用我們的數(shù)據(jù)?)。開源技術(如 Delta Lake、Structured Streaming 和 Deequ)是這種靈活性的關鍵。隨著技術的發(fā)展,能夠使用最新最、最強大的解決方案是提升其競爭優(yōu)勢的驅(qū)動力。最重要的是,你的數(shù)據(jù)的速度和質(zhì)量一定不能對立,而要保持一致,尤其是在流式數(shù)據(jù)處理越來越靠近核心業(yè)務運營時。很快,這將不會是一種選擇,而是一種期望和要求,我們正朝著這個未來方向一次一小步地不斷前進。

以上就是 Databricks 如何使用 Spark Streaming 和 Delta Lake 對流式數(shù)據(jù)進行數(shù)據(jù)質(zhì)量監(jiān)控,丸趣 TV 小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降摹OM隳芡ㄟ^這篇文章學到更多知識。更多詳情敬請關注丸趣 TV 行業(yè)資訊頻道。

正文完
 
丸趣
版權聲明:本站原創(chuàng)文章,由 丸趣 2023-08-16發(fā)表,共計3157字。
轉(zhuǎn)載說明:除特殊說明外本站除技術相關以外文章皆由網(wǎng)絡搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 濉溪县| 台东市| 离岛区| 定西市| 茌平县| 西林县| 两当县| 海口市| 大英县| 京山县| 沂源县| 股票| 冷水江市| 淅川县| 达拉特旗| 新兴县| 洪泽县| 资源县| 怀柔区| 七台河市| 巴塘县| 高平市| 扶沟县| 团风县| 凌云县| 应城市| 舒城县| 铜陵市| 贵溪市| 肥东县| 乐安县| 濮阳市| 余姚市| 长岛县| 孝义市| 江华| 昭平县| 平邑县| 景东| 蒙山县| 临西县|