久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何快速掌握Fink SQL

171次閱讀
沒有評論

共計 4216 個字符,預(yù)計需要花費 11 分鐘才能閱讀完成。

這篇文章主要講解了“如何快速掌握 Fink SQL”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學(xué)習(xí)“如何快速掌握 Fink SQL”吧!

1、導(dǎo)入所需要的的依賴包

dependency   groupId org.apache.flink /groupId   artifactId flink-table-planner_2.12 /artifactId   version 1.10.1 /version   /dependency   dependency   groupId org.apache.flink /groupId   artifactId flink-table-api-scala-bridge_2.12 /artifactId   version 1.10.1 /version   /dependency   dependency   groupId org.apache.flink /groupId   artifactId flink-csv /artifactId   version 1.10.1 /version   /dependency

flink-table-planner:planner 計劃器,是 table API 最主要的部分,提供了運行時環(huán)境和生成程序執(zhí)行計劃的  planner; flink-table-api-scala-bridge:bridge 橋接器,主要負(fù)責(zé) table API 和  DataStream/DataSet API 的連接支持,按照語言分 java 和 scala。

這里的兩個依賴,是 IDE 環(huán)境下運行需要添加的; 如果是生產(chǎn)環(huán)境,lib 目錄下默認(rèn)已經(jīng)有了 planner,就只需要有 bridge 就可以了。

當(dāng)然,如果想使用用戶自定義函數(shù),或是跟 kafka 做連接,需要有一個 SQL client,這個包含在 flink-table-common   里。

2、兩種 planner(old blink)的區(qū)別

鴻蒙官方戰(zhàn)略合作共建——HarmonyOS 技術(shù)社區(qū)

批流統(tǒng)一:Blink 將批處理作業(yè),視為流式處理的特殊情況。所以,blink 不支持表和 DataSet 之間的轉(zhuǎn)換,批處理作業(yè)將不轉(zhuǎn)換為 DataSet   應(yīng)用程序,而是跟流處理一樣,轉(zhuǎn)換為 DataStream 程序來處理。

因 為 批 流 統(tǒng) 一,Blink planner 也 不 支 持 BatchTableSource,而 使 用 有 界 的

Blink planner 只支持全新的目錄,不支持已棄用的 ExternalCatalog。

舊 planner 和 Blink planner 的 FilterableTableSource 實現(xiàn)不兼容。舊的 planner   會把 PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 則會把 Expressions   下推。

基于字符串的鍵值配置選項僅適用于 Blink planner。

PlannerConfig 在兩個 planner 中的實現(xiàn)不同。

Blink planner 會將多個 sink 優(yōu)化在一個 DAG 中(僅在 TableEnvironment 上受支持,而在  StreamTableEnvironment 上不受支持)。而舊 planner 的優(yōu)化總是將每一個 sink 放在一個新的 DAG 中,其中所有 DAG   彼此獨立。

舊的 planner 不支持目錄統(tǒng)計,而 Blink planner 支持。

3、表 (Table) 的概念

TableEnvironment 可以注冊目錄 Catalog,并可以基于 Catalog 注冊表。它會維護(hù)一個 Catalog-Table 表之間的  map。表 (Table) 是由一個標(biāo)識符來指定的,由 3 部分組成:Catalog   名、數(shù)據(jù)庫 (database) 名和對象名(表名)。如果沒有指定目錄或數(shù)據(jù)庫,就使用當(dāng)前的默認(rèn)值。

4、連接到文件系統(tǒng)(Csv 格式)

連接外部系統(tǒng)在 Catalog 中注冊表,直接調(diào)用 tableEnv.connect()就可以,里面參數(shù)要傳入一個  ConnectorDescriptor,也就是 connector 描述器。對于文件系統(tǒng)的 connector 而言,flink 內(nèi)部已經(jīng)提供了,就叫做  FileSystem()。

5、測試案例 (新)

需求:將一個 txt 文本文件作為輸入流讀取數(shù)據(jù)過濾 id 不等于 sensor_1 的數(shù)據(jù)實現(xiàn)思路:  首先我們先構(gòu)建一個 table 的 env 環(huán)境通過 connect 提供的方法來讀取數(shù)據(jù)然后設(shè)置表結(jié)構(gòu)將數(shù)據(jù)注冊為一張表就可進(jìn)行我們的數(shù)據(jù)過濾了(使用 sql 或者流處理方式進(jìn)行解析)

準(zhǔn)備數(shù)據(jù)

sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

代碼實現(xiàn)

import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{DataTypes} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} /** * @Package * @author  大數(shù)據(jù)老哥  * @date 2020/12/12 21:22 * @version V1.0 *  第一個 Flinksql 測試案例  */ object FlinkSqlTable { def main(args: Array[String]): Unit = { //  構(gòu)建運行流處理的運行環(huán)境  val env = StreamExecutionEnvironment.getExecutionEnvironment //  構(gòu)建 table 環(huán)境  val tableEnv = StreamTableEnvironment.create(env) // 通過  connect  讀取數(shù)據(jù)  tableEnv.connect(new FileSystem().path(D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt)) .withFormat(new Csv()) // 設(shè)置類型  .withSchema(new Schema() //  給數(shù)據(jù)添加元數(shù)信息  .field(id , DataTypes.STRING()) .field(time , DataTypes.BIGINT()) .field(temperature , DataTypes.DOUBLE()) ).createTemporaryTable(inputTable) //  創(chuàng)建一個臨時表  val resTable = tableEnv.from(inputTable) .select(*).filter(id ===  sensor_1) //  使用 sql 的方式查詢數(shù)據(jù)  var resSql = tableEnv.sqlQuery(select * from inputTable where id= sensor_1) //  將數(shù)據(jù)轉(zhuǎn)為流進(jìn)行輸出  resTable.toAppendStream[(String, Long, Double)].print(resTable) resSql.toAppendStream[(String, Long, Double)].print(resSql) env.execute(FlinkSqlWrodCount) } }

6、TableEnvironment 的作用

注冊 catalog

在內(nèi)部 catalog 中注冊表

執(zhí)行 SQL 查詢

注冊用戶自定義函數(shù)

注冊用戶自定義函數(shù)

保存對 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

在創(chuàng)建 TableEnv 的時候,可以多傳入一個 EnvironmentSettings 或者 TableConfig 參數(shù),可以用來配置  TableEnvironment 的一些特性。

7、老版本創(chuàng)建流處理批處理

7.1 老版本流處理

val settings = EnvironmentSettings.newInstance() .useOldPlanner() //  使用老版本  planner .inStreamingMode() //  流處理模式  .build() val tableEnv = StreamTableEnvironment.create(env, settings)

7.2 老版本批處理

val batchEnv = ExecutionEnvironment.getExecutionEnvironment val batchTableEnv = BatchTableEnvironment.create(batchEnv)

7.3 blink 版本的流處理環(huán)境

val bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)

7.4 blink 版本的批處理環(huán)境

val bbSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings)

感謝各位的閱讀,以上就是“如何快速掌握 Fink SQL”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對如何快速掌握 Fink SQL 這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-07-27發(fā)表,共計4216字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 长宁区| 沂南县| 晋江市| 莱州市| 吉隆县| 历史| 丹寨县| 施秉县| 双牌县| 聊城市| 南投市| 大英县| 辉南县| 洪雅县| 宁安市| 兴宁市| 阳高县| 肇州县| 沅江市| 元朗区| 神池县| 荔浦县| 临澧县| 万盛区| 观塘区| 博野县| 章丘市| 宾阳县| 商水县| 浦县| 长沙县| 东源县| 明水县| 莫力| 普安县| 叙永县| 万载县| 宁夏| 九江县| 江源县| 嘉黎县|