共計 9274 個字符,預(yù)計需要花費 24 分鐘才能閱讀完成。
今天就跟大家聊聊有關(guān) Spark SQL 數(shù)據(jù)加載和保存的實例分析,可能很多人都不太了解,為了讓大家更加了解,丸趣 TV 小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
一、前置知識詳解 Spark SQL 重要是操作 DataFrame,DataFrame 本身提供了 save 和 load 的操作,Load:可以創(chuàng)建 DataFrame,Save:把 DataFrame 中的數(shù)據(jù)保存到文件或者說與具體的格式來指明我們要讀取的文件的類型以及與具體的格式來指出我們要輸出的文件是什么類型。
二、Spark SQL 讀寫數(shù)據(jù)代碼實戰(zhàn)
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.List;public class SparkSQLLoadSaveOps { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster(local).setAppName(SparkSQLLoadSaveOps JavaSparkContext sc = new JavaSparkContext(conf); SQLContext = new SQLContext(sc); /** * read() 是 DataFrameReader 類型,load 可以將數(shù)據(jù)讀取出來 */ DataFrame peopleDF = sqlContext.read().format(json).load(E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json /** * 直接對 DataFrame 進(jìn)行操作 * Json: 是一種自解釋的格式,讀取 Json 的時候怎么判斷其是什么格式? * 通過掃描整個 Json。掃描之后才會知道元數(shù)據(jù) */ // 通過 mode 來指定輸出文件的是 append。創(chuàng)建新文件來追加文件 peopleDF.select( name).write().mode(SaveMode.Append).save(E:\\personNames }}
讀取過程源碼分析如下:1. read 方法返回 DataFrameReader,用于讀取數(shù)據(jù)。
/** * :: Experimental :: * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]]. * {{{ * sqlContext.read.parquet( /path/to/file.parquet) * sqlContext.read.schema(schema).json(/path/to/file.json) * }}} * * @group genericdata * @since 1.4.0 */@Experimental// 創(chuàng)建 DataFrameReader 實例,獲得了 DataFrameReader 引用 def read: DataFrameReader = new DataFrameReader(this)
2. 然后再調(diào)用 DataFrameReader 類中的 format,指出讀取文件的格式。
/** * Specifies the input data source format. * * @since 1.4.0 */def format(source: String): DataFrameReader = { this.source = source this}
3. 通過 DtaFrameReader 中 load 方法通過路徑把傳入過來的輸入變成 DataFrame。
/** * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = { option( path , path).load()}
至此,數(shù)據(jù)的讀取工作就完成了,下面就對 DataFrame 進(jìn)行操作。下面就是寫操作!!!
1. 調(diào)用 DataFrame 中 select 函數(shù)進(jìn)行對列篩選
/** * Selects a set of columns. This is a variant of `select` that can only select * existing columns using column names (i.e. cannot construct expressions). * * {{{ * // The following two are equivalent: * df.select( colA , colB) * df.select($ colA , $ colB) * }}} * @group dfops * @since 1.3.0 */@scala.annotation.varargsdef select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
2. 然后通過 write 將結(jié)果寫入到外部存儲系統(tǒng)中。
/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */@Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)
3. 在保持文件的時候 mode 指定追加文件的方式
/** * Specifies the behavior when data or table already exists. Options include:// Overwrite 是覆蓋 * - `SaveMode.Overwrite`: overwrite the existing data.// 創(chuàng)建新的文件,然后追加 * - `SaveMode.Append`: append the data. * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this}
4. 最后,save() 方法觸發(fā) action,將文件輸出到指定文件中。
/** * Saves the content of the [[DataFrame]] at the specified path. * * @since 1.4.0 */def save(path: String): Unit = { this.extraOptions += ( path - path) save()}
三、Spark SQL 讀寫整個流程圖如下
四、對于流程中部分函數(shù)源碼詳解
DataFrameReader.Load()
1. Load()返回 DataFrame 類型的數(shù)據(jù)集合,使用的數(shù)據(jù)是從默認(rèn)的路徑讀取。
/** * Returns the dataset stored at path as a DataFrame, * using the default data source configured by spark.sql.sources.default. * * @group genericdata * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0. */@deprecated(Use read.load(path). This will be removed in Spark 2.0. , 1.4.0 )def load(path: String): DataFrame = {// 此時的 read 就是 DataFrameReader read.load(path)}
2. 追蹤 load 源碼進(jìn)去,源碼如下:在 DataFrameReader 中的方法。Load() 通過路徑把輸入傳進(jìn)來變成一個 DataFrame。
/** * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = { option( path , path).load()}
3. 追蹤 load 源碼如下:
/** * Loads input in as a [[DataFrame]], for data sources that don t require a path (e.g. external * key-value stores). * * @since 1.4.0 */def load(): DataFrame = {// 對傳入的 Source 進(jìn)行解析 val resolved = ResolvedDataSource( sqlContext, userSpecifiedSchema = userSpecifiedSchema, partitionColumns = Array.empty[String], provider = source, options = extraOptions.toMap) DataFrame(sqlContext, LogicalRelation(resolved.relation))}
DataFrameReader.format()
1. Format:具體指定文件格式,這就獲得一個巨大的啟示是:如果是 Json 文件格式可以保持為 Parquet 等此類操作。Spark SQL 在讀取文件的時候可以指定讀取文件的類型。例如,Json,Parquet.
/** * Specifies the input data source format.Built-in options include “parquet”,”json”,etc. * * @since 1.4.0 */def format(source: String): DataFrameReader = { this.source = source //FileType this}
DataFrame.write()
1. 創(chuàng)建 DataFrameWriter 實例
/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */@Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)
1
2. 追蹤 DataFrameWriter 源碼如下:以 DataFrame 的方式向外部存儲系統(tǒng)中寫入數(shù)據(jù)。
/** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, * key-value stores, etc). Use [[DataFrame.write]] to access this. * * @since 1.4.0 */@Experimentalfinal class DataFrameWriter private[sql](df: DataFrame) {
DataFrameWriter.mode()
1. Overwrite 是覆蓋,之前寫的數(shù)據(jù)全都被覆蓋了。Append: 是追加,對于普通文件是在一個文件中進(jìn)行追加,但是對于 parquet 格式的文件則創(chuàng)建新的文件進(jìn)行追加。
/** * Specifies the behavior when data or table already exists. Options include: * - `SaveMode.Overwrite`: overwrite the existing data. * - `SaveMode.Append`: append the data. * - `SaveMode.Ignore`: ignore the operation (i.e. no-op).// 默認(rèn)操作 * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this}
2. 通過模式匹配接收外部參數(shù)
/** * Specifies the behavior when data or table already exists. Options include: * - `overwrite`: overwrite the existing data. * - `append`: append the data. * - `ignore`: ignore the operation (i.e. no-op). * - `error`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: String): DataFrameWriter = { this.mode = saveMode.toLowerCase match { case overwrite = SaveMode.Overwrite case append = SaveMode.Append case ignore = SaveMode.Ignore case error | default = SaveMode.ErrorIfExists case _ = throw new IllegalArgumentException(s Unknown save mode: $saveMode. + Accepted modes are overwrite , append , ignore , error .) } this}
DataFrameWriter.save()
1. save 將結(jié)果保存?zhèn)魅氲穆窂健?/p>
/** * Saves the content of the [[DataFrame]] at the specified path. * * @since 1.4.0 */def save(path: String): Unit = { this.extraOptions += ( path - path) save()}
2. 追蹤 save 方法。
/** * Saves the content of the [[DataFrame]] as the specified table. * * @since 1.4.0 */def save(): Unit = { ResolvedDataSource( df.sqlContext, source, partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), mode, extraOptions.toMap, df)}
3. 其中 source 是 SQLConf 的 defaultDataSourceNameprivate var source: String = df.sqlContext.conf.defaultDataSourceName 其中 DEFAULT_DATA_SOURCE_NAME 默認(rèn)參數(shù)是 parquet。
// This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf(spark.sql.sources.default , defaultValue = Some( org.apache.spark.sql.parquet), doc = The default data source to use in input/output. )
DataFrame.scala 中部分函數(shù)詳解:
1. toDF 函數(shù)是將 RDD 轉(zhuǎn)換成 DataFrame
/** * Returns the object itself. * @group basic * @since 1.3.0 */// This is declared with parentheses to prevent the Scala compiler from treating// `rdd.toDF(1)` as invoking this toDF and then apply on the returned DataFrame.def toDF(): DataFrame = this
2. show() 方法:將結(jié)果顯示出來
/** * Displays the [[DataFrame]] in a tabular form. For example: * {{{ * year month AVG( Adj Close) MAX(Adj Close) * 1980 12 0.503218 0.595103 * 1981 01 0.523289 0.570307 * 1982 02 0.436504 0.475256 * 1983 03 0.410516 0.442194 * 1984 04 0.450090 0.483521 * }}} * @param numRows Number of rows to show * @param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right * * @group action * @since 1.5.0 */// scalastyle:off printlndef show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))// scalastyle:on println
追蹤 showString 源碼如下:showString 中觸發(fā) action 收集數(shù)據(jù)。
/** * Compose the string representing rows for output * @param _numRows Number of rows to show * @param truncate Whether truncate long strings and align cells right */private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { val numRows = _numRows.max(0) val sb = new StringBuilder val takeResult = take(numRows + 1) val hasMoreData = takeResult.length numRows val data = takeResult.take(numRows) val numCols = schema.fieldNames.length
看完上述內(nèi)容,你們對 Spark SQL 數(shù)據(jù)加載和保存的實例分析有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注丸趣 TV 行業(yè)資訊頻道,感謝大家的支持。