共計(jì) 6476 個字符,預(yù)計(jì)需要花費(fèi) 17 分鐘才能閱讀完成。
這篇文章主要為大家展示了“Spark-sql 如何創(chuàng)建外部分區(qū)表”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓丸趣 TV 小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Spark-sql 如何創(chuàng)建外部分區(qū)表”這篇文章吧。
一、Spark-sql 創(chuàng)建外部分區(qū)表
1. 使用 spark-sql
spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10 --executor-cores 2 --executor-memory 3G
2.spark-sql 中創(chuàng)建 parquet 分區(qū)表:
create external table pgls.convert_parq(
bill_num string,
logis_id string,
store_id string,
store_code string,
creater_id string,
order_status INT,
pay_status INT,
order_require_varieties INT,
order_require_amount decimal(19,4),
order_rec_amount decimal(19,4),
order_rec_gpf decimal(19,4),
deli_fee FLOAT,
order_type INT,
last_modify_time timestamp,
order_submit_time timestamp
)
partitioned by(order_submit_date date)
row format serde org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
stored as parquetfile
location /test/spark/convert/parquet/bill_parq/
二、CSV 轉(zhuǎn) Parquet
代碼:org.apache.spark.ConvertToParquet.scala
package org.apache.spark
import com.ecfront.fs.operation.HDFSOperation
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
* CSV 轉(zhuǎn)換為 parquet
* 參數(shù):輸入路徑, 輸出路徑, 分區(qū)數(shù)
object ConvertToParquet{def main(args: Array[String]) {if(args.length != 3){println( jar args: inputFiles outPath numpartitions)
System.exit(0)
val inputPath = args(0)
val outPath = args(1)
val numPartitions = args(2).toInt
println(==========================================)
println(=========input: + inputPath )
println(=========output: + outPath )
println(==numPartitions: + numPartitions )
println(==========================================)
// 判斷輸出目錄是否存在,存在則刪除
val fo = HDFSOperation(new Configuration())
val existDir = fo.existDir(outPath)
if(existDir) {println( HDFS exists outpath: + outPath)
println(start to delete ...)
val isDelete = fo.deleteDir(outPath)
if(isDelete){println(outPath + delete done. )
val conf = new SparkConf()
val sc = new SparkContext(conf) // 參數(shù) SparkConf 創(chuàng)建 SparkContext,
val sqlContext = new SQLContext(sc) // 參數(shù) SparkContext 創(chuàng)建 SQLContext
val schema = StructType(Array(StructField( bill_num ,DataTypes.StringType,false),
StructField(logis_id ,DataTypes.StringType,false),
StructField(store_id ,DataTypes.StringType,false),
StructField(store_code ,DataTypes.StringType,false),
StructField(creater_id ,DataTypes.StringType,false),
StructField(order_status ,DataTypes.IntegerType,false),
StructField(pay_status ,DataTypes.IntegerType,false),
StructField(order_require_varieties ,DataTypes.IntegerType,false),
StructField(order_require_amount ,DataTypes.createDecimalType(19,4),false),
StructField(order_rec_amount ,DataTypes.createDecimalType(19,4),false),
StructField(order_rec_gpf ,DataTypes.createDecimalType(19,4),false),
StructField(deli_fee ,DataTypes.FloatType,false),
StructField(order_type ,DataTypes.IntegerType,false),
StructField(last_modify_time ,DataTypes.TimestampType,false),
StructField(order_submit_time ,DataTypes.TimestampType,false),
StructField(order_submit_date ,DataTypes.DateType,false)))
convert(sqlContext, inputPath, schema, outPath, numPartitions)
//CSV 轉(zhuǎn)換為 parquet
def convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) {
// 將 text 導(dǎo)入到 DataFrame
val df = sqlContext.read.format(com.databricks.spark.csv).
schema(schema).option(delimiter , ,).load(inputpath)
// 轉(zhuǎn)換為 parquet
// df.write.parquet(outpath) // 轉(zhuǎn)換時(shí)以 block 數(shù)為分區(qū)數(shù)
df.coalesce(numPartitions).write.parquet(outpath) // 自定義分區(qū)數(shù)
}
打包后 jar 上傳至本地目錄:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar
事先在 HDFS 上生成 CSV 文件,HDFS 目錄:/test/spark/convert/data/order/2016-05-01/
執(zhí)行命令:
spark-submit --queue spark --master yarn --num-executors 10 --executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar /test/spark/convert/data/order/2016-05-01/ /test/spark/convert/parquet/bill_parq/order_submit_date=2016-05-01
pom.xml 相關(guān)內(nèi)容:
1. 依賴包:
dependencies
!-- 操作 HDFS --
dependency
groupId com.ecfront /groupId
artifactId ez-fs /artifactId
version 0.9 /version
/dependency
!--spark --
dependency
groupId org.apache.spark /groupId
artifactId spark-core_2.10 /artifactId
version 1.6.1 /version
/dependency
dependency
groupId org.apache.spark /groupId
artifactId spark-sql_2.10 /artifactId
version 1.6.1 /version
/dependency
!--spark csv--
dependency
groupId com.databricks /groupId
artifactId spark-csv_2.11 /artifactId
version 1.4.0 /version
/dependency
!--hadoop --
dependency
groupId org.apache.hadoop /groupId
artifactId hadoop-client /artifactId
version 2.6.0 /version
/dependency
/dependencies
2.plugins(含打入依賴包)
build
pluginManagement
plugins
plugin
groupId net.alchim31.maven /groupId
artifactId scala-maven-plugin /artifactId
version 3.2.1 /version
/plugin
plugin
groupId org.apache.maven.plugins /groupId
artifactId maven-compiler-plugin /artifactId
version 2.0.2 /version
/plugin
plugin
groupId org.apache.maven.plugins /groupId
artifactId maven-shade-plugin /artifactId
version 1.4 /version
configuration
filters
filter
artifact *:* /artifact
excludes
exclude META-INF/*.SF /exclude
exclude META-INF/*.DSA /exclude
exclude META-INF/*.RSA /exclude
/excludes
/filter
/filters
/configuration
/plugin
/plugins
/pluginManagement
plugins
plugin
groupId net.alchim31.maven /groupId
artifactId scala-maven-plugin /artifactId
executions
execution
id scala-compile-first /id
phase process-resources /phase
goals
goal add-source /goal
goal compile /goal
/goals
/execution
execution
id scala-test-compile /id
phase process-test-resources /phase
goals
goal testCompile /goal
/goals
/execution
/executions
/plugin
plugin
groupId org.apache.maven.plugins /groupId
artifactId maven-compiler-plugin /artifactId
executions
execution
phase compile /phase
goals
goal compile /goal
/goals
/execution
/executions
/plugin
plugin
groupId org.apache.maven.plugins /groupId
artifactId maven-shade-plugin /artifactId
version 1.4 /version
configuration
createDependencyReducedPom true /createDependencyReducedPom
/configuration
executions
execution
phase package /phase
goals
goal shade /goal
/goals
configuration
transformers
transformer
implementation= org.apache.maven.plugins.shade.resource.ServicesResourceTransformer /
transformer
implementation= org.apache.maven.plugins.shade.resource.ManifestResourceTransformer
mainClass org.apache.spark.ConvertToParquet /mainClass
/transformer
/transformers
/configuration
/execution
/executions
/plugin
/plugins
/build
三、表添加分區(qū)
spark-sql 下執(zhí)行
alter table pgls.convert_parq add partition(order_submit_date= 2016-05-01
可通過 sql 查詢到相應(yīng)數(shù)據(jù):
select * from pgls.convert_parq where order_submit_date= 2016-05-01 limit 5;
以上是“Spark-sql 如何創(chuàng)建外部分區(qū)表”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注丸趣 TV 行業(yè)資訊頻道!
正文完