久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Spark

178次閱讀
沒有評論

共計(jì) 6476 個字符,預(yù)計(jì)需要花費(fèi) 17 分鐘才能閱讀完成。

這篇文章主要為大家展示了“Spark-sql 如何創(chuàng)建外部分區(qū)表”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓丸趣 TV 小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Spark-sql 如何創(chuàng)建外部分區(qū)表”這篇文章吧。

一、Spark-sql 創(chuàng)建外部分區(qū)表

1. 使用 spark-sql

spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10 --executor-cores 2 --executor-memory 3G

2.spark-sql 中創(chuàng)建 parquet 分區(qū)表:

create external table pgls.convert_parq(
bill_num string,
logis_id string,
store_id string,
store_code string,
creater_id string,
order_status INT,
pay_status INT,
order_require_varieties INT,
order_require_amount decimal(19,4),
order_rec_amount decimal(19,4),
order_rec_gpf decimal(19,4),
deli_fee FLOAT,
order_type INT,
last_modify_time timestamp,
order_submit_time timestamp
) 
partitioned by(order_submit_date date)
row format serde  org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe 
stored as parquetfile
location  /test/spark/convert/parquet/bill_parq/

二、CSV 轉(zhuǎn) Parquet

代碼:org.apache.spark.ConvertToParquet.scala

package org.apache.spark
import com.ecfront.fs.operation.HDFSOperation
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
* CSV  轉(zhuǎn)換為  parquet
*  參數(shù):輸入路徑,  輸出路徑,  分區(qū)數(shù)
object ConvertToParquet{def main(args: Array[String]) {if(args.length != 3){println( jar args: inputFiles outPath numpartitions)
System.exit(0)
val inputPath = args(0)
val outPath = args(1)
val numPartitions = args(2).toInt
println(==========================================)
println(=========input:  + inputPath )
println(=========output:  + outPath )
println(==numPartitions:  + numPartitions )
println(==========================================)
// 判斷輸出目錄是否存在,存在則刪除
val fo = HDFSOperation(new Configuration())
val existDir = fo.existDir(outPath)
if(existDir) {println( HDFS exists outpath:   + outPath)
println(start to delete ...)
val isDelete = fo.deleteDir(outPath)
if(isDelete){println(outPath +  delete done. )
val conf = new SparkConf()
val sc = new SparkContext(conf) // 參數(shù) SparkConf 創(chuàng)建 SparkContext,
val sqlContext = new SQLContext(sc) // 參數(shù) SparkContext 創(chuàng)建 SQLContext
val schema = StructType(Array(StructField( bill_num ,DataTypes.StringType,false),
StructField(logis_id ,DataTypes.StringType,false),
StructField(store_id ,DataTypes.StringType,false),
StructField(store_code ,DataTypes.StringType,false),
StructField(creater_id ,DataTypes.StringType,false),
StructField(order_status ,DataTypes.IntegerType,false),
StructField(pay_status ,DataTypes.IntegerType,false),
StructField(order_require_varieties ,DataTypes.IntegerType,false),
StructField(order_require_amount ,DataTypes.createDecimalType(19,4),false),
StructField(order_rec_amount ,DataTypes.createDecimalType(19,4),false),
StructField(order_rec_gpf ,DataTypes.createDecimalType(19,4),false),
StructField(deli_fee ,DataTypes.FloatType,false),
StructField(order_type ,DataTypes.IntegerType,false),
StructField(last_modify_time ,DataTypes.TimestampType,false),
StructField(order_submit_time ,DataTypes.TimestampType,false),
StructField(order_submit_date ,DataTypes.DateType,false)))
convert(sqlContext, inputPath, schema, outPath, numPartitions)
//CSV 轉(zhuǎn)換為 parquet
def convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) {
//  將 text 導(dǎo)入到 DataFrame
val df = sqlContext.read.format(com.databricks.spark.csv).
schema(schema).option(delimiter ,  ,).load(inputpath)
//  轉(zhuǎn)換為 parquet
// df.write.parquet(outpath) //  轉(zhuǎn)換時(shí)以 block 數(shù)為分區(qū)數(shù)
df.coalesce(numPartitions).write.parquet(outpath) // 自定義分區(qū)數(shù)
}
 打包后 jar 上傳至本地目錄:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar
事先在 HDFS 上生成 CSV 文件,HDFS 目錄:/test/spark/convert/data/order/2016-05-01/
執(zhí)行命令:
spark-submit --queue spark --master yarn --num-executors 10 --executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar /test/spark/convert/data/order/2016-05-01/ /test/spark/convert/parquet/bill_parq/order_submit_date=2016-05-01

pom.xml 相關(guān)內(nèi)容:

1. 依賴包:

dependencies 
 !--  操作 HDFS -- 
 dependency 
  groupId com.ecfront /groupId 
  artifactId ez-fs /artifactId 
  version 0.9 /version 
  /dependency 
 !--spark -- 
 dependency 
  groupId org.apache.spark /groupId 
  artifactId spark-core_2.10 /artifactId 
  version 1.6.1 /version 
  /dependency 
  dependency 
  groupId org.apache.spark /groupId 
  artifactId spark-sql_2.10 /artifactId 
  version 1.6.1 /version 
  /dependency 
 !--spark csv-- 
 dependency 
  groupId com.databricks /groupId 
  artifactId spark-csv_2.11 /artifactId 
  version 1.4.0 /version 
  /dependency 
 !--hadoop -- 
 dependency 
  groupId org.apache.hadoop /groupId 
  artifactId hadoop-client /artifactId 
  version 2.6.0 /version 
  /dependency 
 /dependencies

    2.plugins(含打入依賴包)

build 
  pluginManagement 
  plugins 
  plugin 
  groupId net.alchim31.maven /groupId 
  artifactId scala-maven-plugin /artifactId 
  version 3.2.1 /version 
  /plugin 
  plugin 
  groupId org.apache.maven.plugins /groupId 
  artifactId maven-compiler-plugin /artifactId 
  version 2.0.2 /version 
  /plugin 
  plugin 
  groupId org.apache.maven.plugins /groupId 
  artifactId maven-shade-plugin /artifactId 
  version 1.4 /version 
  configuration 
  filters 
  filter 
  artifact *:* /artifact 
  excludes 
  exclude META-INF/*.SF /exclude 
  exclude META-INF/*.DSA /exclude 
  exclude META-INF/*.RSA /exclude 
  /excludes 
  /filter 
  /filters 
  /configuration 
  /plugin 
  /plugins 
  /pluginManagement 
  plugins 
  plugin 
  groupId net.alchim31.maven /groupId 
  artifactId scala-maven-plugin /artifactId 
  executions 
  execution 
  id scala-compile-first /id 
  phase process-resources /phase 
  goals 
  goal add-source /goal 
  goal compile /goal 
  /goals 
  /execution 
  execution 
  id scala-test-compile /id 
  phase process-test-resources /phase 
  goals 
  goal testCompile /goal 
  /goals 
  /execution 
  /executions 
  /plugin 
  plugin 
  groupId org.apache.maven.plugins /groupId 
  artifactId maven-compiler-plugin /artifactId 
  executions 
  execution 
  phase compile /phase 
  goals 
  goal compile /goal 
  /goals 
  /execution 
  /executions 
  /plugin 
  plugin 
  groupId org.apache.maven.plugins /groupId 
  artifactId maven-shade-plugin /artifactId 
  version 1.4 /version 
  configuration 
  createDependencyReducedPom true /createDependencyReducedPom 
  /configuration 
  executions 
  execution 
  phase package /phase 
  goals 
  goal shade /goal 
  /goals 
  configuration 
  transformers 
  transformer
implementation= org.apache.maven.plugins.shade.resource.ServicesResourceTransformer / 
  transformer
implementation= org.apache.maven.plugins.shade.resource.ManifestResourceTransformer 
  mainClass org.apache.spark.ConvertToParquet /mainClass 
  /transformer 
  /transformers 
  /configuration 
  /execution 
  /executions 
  /plugin 
  /plugins 
 /build

三、表添加分區(qū)

spark-sql 下執(zhí)行

alter table pgls.convert_parq add partition(order_submit_date= 2016-05-01

可通過 sql 查詢到相應(yīng)數(shù)據(jù):

select * from pgls.convert_parq where order_submit_date= 2016-05-01  limit 5;

Spark-sql 如何創(chuàng)建外部分區(qū)表

以上是“Spark-sql 如何創(chuàng)建外部分區(qū)表”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注丸趣 TV 行業(yè)資訊頻道!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-25發(fā)表,共計(jì)6476字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 沂南县| 女性| 康马县| 湾仔区| 盐城市| 南昌县| 温州市| 厦门市| 和顺县| 拜城县| 峡江县| 霍州市| 瓮安县| 忻城县| 绥滨县| 望城县| 内丘县| 蓬莱市| 庆城县| 清流县| 建宁县| 鄢陵县| 岐山县| 玛曲县| 太原市| 山阴县| 讷河市| 张家口市| 常州市| 通州市| 秀山| 会泽县| 阆中市| 开封市| 彭山县| 嘉荫县| 金川县| 平邑县| 拉孜县| 永城市| 探索|