久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

RDS與POLARDB歸檔到X

189次閱讀
沒有評論

共計 3025 個字符,預計需要花費 8 分鐘才能閱讀完成。

本篇內容介紹了“RDS 與 POLARDB 歸檔到 X -Pack Spark 計算的方法”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

X-Pack Spark 服務通過外部計算資源的方式,為 Redis、Cassandra、MongoDB、HBase、RDS 存儲服務提供復雜分析、流式處理及入庫、機器學習的能力,從而更好的解決用戶數據處理相關場景問題。

RDS POLARDB 分表歸檔到 X -Pack Spark 步驟

一鍵關聯 POLARDB 到 Spark 集群

POLARDB 表存儲

在 database‘test1’中每 5 分鐘生成一張表,這里假設為表 test1、test2、test2、…

具體的建表語句如下:

* 請左右滑動閱覽

 CREATE TABLE `test1` ( `a` int(11) NOT NULL,
 `b` time DEFAULT NULL, 
 `c` double DEFAULT NULL,
 PRIMARY KEY (`a`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8

歸檔到 Spark 的調試

x-pack spark 提供交互式查詢模式支持直接在控制臺提交 sql、python 腳本、scala code 來調試。

1、首先創建一個交互式查詢的 session,在其中添加 mysql-connector 的 jar 包。

2、創建交互式查詢

以 pyspark 為例,下面是具體歸檔 demo 的代碼:

* 請左右滑動閱覽

spark.sql(drop table sparktest).show()
#  創建一張 spark 表,三級分區,分別是天、小時、分鐘,最后一級分鐘用來存儲具體的 5 分鐘的一張 polardb 表達的數據。字段和 polardb 里面的類型一致
spark.sql(CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string)  
  USING parquet PARTITIONED BY (dt ,hh ,mm ) ).show()
#本例子在 polardb 里面創建了 databse test1,具有三張表 test1 ,test2,test3, 這里遍歷這三張表,每個表存儲 spark 的一個 5min 的分區
# CREATE TABLE `test1` (# `a` int(11) NOT NULL,
# `b` time DEFAULT NULL,
# `c` double DEFAULT NULL,
# PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4): 
 # 構造 polardb 的表名
 dbtable =  test1.  +  test  + str(num)
 #spark 外表關聯 polardb 對應的表
 externalPolarDBTableNow = spark.read \
 .format(jdbc) \
 .option(driver ,  com.mysql.jdbc.Driver) \
 .option(url ,  jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306) \
 .option(dbtable , dbtable) \
 .option(user ,  name) \
 .option(password ,  xxx*) \
 .load().registerTempTable( polardbTableTemp)
 # 生成本次 polardb 表數據要寫入的 spark 表的分區信息
 (dtValue, hhValue, mmValue) = (20191015 ,  13 , str(05 * num))
 # 執行導數據 sql 
 spark.sql(insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  
  select * from polardbTableTemp   % (dtValue, hhValue, mmValue)).show()
 # 刪除臨時的 spark 映射 polardb 表的 catalog
 spark.catalog.dropTempView(polardbTableTemp)
 # 查看下分區以及統計下數據,主要用來做測試驗證,實際運行過程可以刪除
 spark.sql(show partitions sparktest).show(1000, False)
 spark.sql(select count(*) from sparktest ).show()

歸檔作業上生產

交互式查詢定位為臨時查詢及調試,生產的作業還是建議使用 spark 作業的方式運行,使用文檔參考。這里以 pyspark 作業為例:

/polardb/polardbArchiving.py 內容如下:

* 請左右滑動閱覽

# -*- coding: UTF-8 -*-
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ ==  __main__ :
 spark = SparkSession \
 .builder \
 .appName(PolardbArchiving) \
 .enableHiveSupport() \
 .getOrCreate()
 spark.sql(drop table sparktest).show()
 #  創建一張 spark 表,三級分區,分別是天、小時、分鐘,最后一級分鐘用來存儲具體的 5 分鐘的一張 polardb 表達的數據。字段和 polardb 里面的類型一致
 spark.sql(CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string)  
  USING parquet PARTITIONED BY (dt ,hh ,mm ) ).show()
 # 本例子在 polardb 里面創建了 databse test1,具有三張表 test1 ,test2,test3, 這里遍歷這三張表,每個表存儲 spark 的一個 5min 的分區
 # CREATE TABLE `test1` ( # `a` int(11) NOT NULL,
 # `b` time DEFAULT NULL,
 # `c` double DEFAULT NULL,
 # PRIMARY KEY (`a`)
 # ) ENGINE=InnoDB DEFAULT CHARSET=utf8
 for num in range(1, 4):
 # 構造 polardb 的表名
 dbtable =  test1.

“RDS 與 POLARDB 歸檔到 X -Pack Spark 計算的方法”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-04發表,共計3025字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 开平市| 尼勒克县| 英山县| 安达市| 宁武县| 合阳县| 兴国县| 八宿县| 张北县| 古浪县| 乐业县| 巢湖市| 大兴区| 苏尼特左旗| 渝中区| 仙游县| 洱源县| 东平县| 西盟| 深泽县| 无棣县| 新和县| 托克托县| 沂南县| 湘潭市| 武定县| 鄂伦春自治旗| 澄江县| 古浪县| 江华| 陇南市| 河西区| 阜康市| 清苑县| 香格里拉县| 平潭县| 衡水市| 大城县| 阿鲁科尔沁旗| 唐海县| 电白县|