共計 3025 個字符,預計需要花費 8 分鐘才能閱讀完成。
本篇內容介紹了“RDS 與 POLARDB 歸檔到 X -Pack Spark 計算的方法”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
X-Pack Spark 服務通過外部計算資源的方式,為 Redis、Cassandra、MongoDB、HBase、RDS 存儲服務提供復雜分析、流式處理及入庫、機器學習的能力,從而更好的解決用戶數據處理相關場景問題。
RDS POLARDB 分表歸檔到 X -Pack Spark 步驟
一鍵關聯 POLARDB 到 Spark 集群
POLARDB 表存儲
在 database‘test1’中每 5 分鐘生成一張表,這里假設為表 test1、test2、test2、…
具體的建表語句如下:
* 請左右滑動閱覽
CREATE TABLE `test1` ( `a` int(11) NOT NULL,
`b` time DEFAULT NULL,
`c` double DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
歸檔到 Spark 的調試
x-pack spark 提供交互式查詢模式支持直接在控制臺提交 sql、python 腳本、scala code 來調試。
1、首先創建一個交互式查詢的 session,在其中添加 mysql-connector 的 jar 包。
2、創建交互式查詢
以 pyspark 為例,下面是具體歸檔 demo 的代碼:
* 請左右滑動閱覽
spark.sql(drop table sparktest).show()
# 創建一張 spark 表,三級分區,分別是天、小時、分鐘,最后一級分鐘用來存儲具體的 5 分鐘的一張 polardb 表達的數據。字段和 polardb 里面的類型一致
spark.sql(CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string)
USING parquet PARTITIONED BY (dt ,hh ,mm ) ).show()
#本例子在 polardb 里面創建了 databse test1,具有三張表 test1 ,test2,test3, 這里遍歷這三張表,每個表存儲 spark 的一個 5min 的分區
# CREATE TABLE `test1` (# `a` int(11) NOT NULL,
# `b` time DEFAULT NULL,
# `c` double DEFAULT NULL,
# PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4):
# 構造 polardb 的表名
dbtable = test1. + test + str(num)
#spark 外表關聯 polardb 對應的表
externalPolarDBTableNow = spark.read \
.format(jdbc) \
.option(driver , com.mysql.jdbc.Driver) \
.option(url , jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306) \
.option(dbtable , dbtable) \
.option(user , name) \
.option(password , xxx*) \
.load().registerTempTable( polardbTableTemp)
# 生成本次 polardb 表數據要寫入的 spark 表的分區信息
(dtValue, hhValue, mmValue) = (20191015 , 13 , str(05 * num))
# 執行導數據 sql
spark.sql(insert into sparktest partition(dt= %s ,hh= %s , mm=%s )
select * from polardbTableTemp % (dtValue, hhValue, mmValue)).show()
# 刪除臨時的 spark 映射 polardb 表的 catalog
spark.catalog.dropTempView(polardbTableTemp)
# 查看下分區以及統計下數據,主要用來做測試驗證,實際運行過程可以刪除
spark.sql(show partitions sparktest).show(1000, False)
spark.sql(select count(*) from sparktest ).show()
歸檔作業上生產
交互式查詢定位為臨時查詢及調試,生產的作業還是建議使用 spark 作業的方式運行,使用文檔參考。這里以 pyspark 作業為例:
/polardb/polardbArchiving.py 內容如下:
* 請左右滑動閱覽
# -*- coding: UTF-8 -*-
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ == __main__ :
spark = SparkSession \
.builder \
.appName(PolardbArchiving) \
.enableHiveSupport() \
.getOrCreate()
spark.sql(drop table sparktest).show()
# 創建一張 spark 表,三級分區,分別是天、小時、分鐘,最后一級分鐘用來存儲具體的 5 分鐘的一張 polardb 表達的數據。字段和 polardb 里面的類型一致
spark.sql(CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string)
USING parquet PARTITIONED BY (dt ,hh ,mm ) ).show()
# 本例子在 polardb 里面創建了 databse test1,具有三張表 test1 ,test2,test3, 這里遍歷這三張表,每個表存儲 spark 的一個 5min 的分區
# CREATE TABLE `test1` ( # `a` int(11) NOT NULL,
# `b` time DEFAULT NULL,
# `c` double DEFAULT NULL,
# PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4):
# 構造 polardb 的表名
dbtable = test1.
“RDS 與 POLARDB 歸檔到 X -Pack Spark 計算的方法”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!