共計 4740 個字符,預計需要花費 12 分鐘才能閱讀完成。
本篇內容主要講解“Spark Data Sources 怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“Spark Data Sources 怎么使用”吧!
一:Data Sources(數據源):
1.1 了解數據源。
Spark SQL 支持對各種數據源通過 DataFrame 接口操作。DataFrame 可以作為正常 的 RDDs 進行操作,也可以注冊為一個臨時表。
注冊 DataFrame 為一個表允許您在其數據運行 SQL 查詢。本節介紹用于加載和保存使用 Spark 數據源的數據的一般方法,然后再進入到可用的內置數據源的特定選項。
1.2 Generic Load/Save Functions(通用加載 / 保存功能)。
最簡單的形式,默認的數據源(parquet 除非否則配置由 spark.sql.sources.default)將用于所有操作。
eg:第一種讀取方式:通過 parquetFile(xxx) 來讀取。
首先把 spark-1.6.1-bin-hadoop2.6\examples\src\main\resources 下的 users.parquet 上傳到 HDFS 上。
public class SparkSqlDemo4 {
private static String appName = Test Spark RDD
private static String master = local
public static void main(String[] args) {SparkConf conf = new SparkConf();
conf.set( spark.testing.memory , 269522560000
JavaSparkContext sc = new JavaSparkContext(master, appName, conf);
// 創建了 sqlContext 的上下文,注意,它是 DataFrame 的起點。SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().load( hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet
df.select(name , favorite_color).write().save( namesAndFavColors.parquet
// 指定保存模式
//df.select(name , favorite_color).write().mode(SaveMode.Overwrite).save( namesAndFavColors.parquet
// 第一種讀取方式
DataFrame parquetFile = sqlContext.parquetFile( namesAndFavColors.parquet
parquetFile.registerTempTable( users
DataFrame df1 = sqlContext.sql( SELECT name,favorite_color FROM users
df1.show();
List String listString = df1.javaRDD().map(new Function Row, String () {
private static final long serialVersionUID = 1L;
public String call(Row row) {
return Name: + row.getString(0) + ,FavoriteColor: + row.getString(1);
}
}).collect();
for (String string : listString) {System.out.println( string );
}
輸出結果如下:
+------+--------------+
| name|favorite_color|
+------+--------------+
|Alyssa| null|
| Ben| red|
+------+--------------+
Name: Alyssa ,FavoriteColor: null
Name: Ben ,FavoriteColor: red
1.3 Manually Specifying Options(手動指定選項):
你可以也手動指定的數據源,將與您想要將傳遞給數據源的任何額外選項一起使用。
數據源由其完全限定名稱 (即 org.apache.spark.sql.parquet),
但對于內置來源您還可以使用 他們短名稱 (json, parquet, jdbc)。
DataFrames 任何類型可以轉換為其他類型,使用此語法。
eg:第二種讀取方式:通過 parquet(xxx) 來讀取。
public class SparkSqlDemo5 {
private static String appName = Test Spark RDD
private static String master = local
public static void main(String[] args) {SparkConf conf = new SparkConf();
conf.set( spark.testing.memory , 269522560000
JavaSparkContext sc = new JavaSparkContext(master, appName, conf);
// 創建了 sqlContext 的上下文,注意,它是 DataFrame 的起點。SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().format( json).load( hdfs://192.168.226.129:9000/txt/sparkshell/people.json
df.select(id , name , sex , age).write().format( parquet).save( people.parquet
DataFrame parquetFile = sqlContext.read().parquet( people.parquet
parquetFile.registerTempTable( people
DataFrame df1 = sqlContext.sql( SELECT id,name,sex,age FROM people WHERE age = 21 AND age = 23
df1.show();
df1.printSchema();
List String listString = df1.javaRDD().map(new Function Row, String () {
private static final long serialVersionUID = 1L;
public String call(Row row) {
return Id: + row.getString(0) + , Name: +row.getString(1) + ,Sex +row.getString(2)+ , Age: + row.getLong(3);
}).collect();
for (String string : listString) {System.out.println( string );
}
1.4 Run SQL on files directly(直接在文件上運行 SQL)
您也可以查詢該文件直接使用 SQL,并對其進行查詢,而不是使用 API 讀取文件加載到 DataFrame。
public class SparkSqlDemo6 {
private static String appName = Test Spark RDD
private static String master = local
public static void main(String[] args) {SparkConf conf = new SparkConf();
conf.set( spark.testing.memory , 269522560000
JavaSparkContext sc = new JavaSparkContext(master, appName, conf);
// 創建了 sqlContext 的上下文,注意,它是 DataFrame 的起點。SQLContext sqlContext = new SQLContext(sc);
// 注意 sql 語句 parquet 后面目錄的符號 。
DataFrame df = sqlContext.sql( SELECT * FROM parquet.`hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet`
df.show();}
注意:sql 語句中紅色部分標記的符號。
SELECT * FROM parquet.`hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet`
二:Save Modes(保存模式):
Save 操作可以可選擇性地接收一個 SaveModel,如果數據已經存在了,指定如何處理已經存在的數據。意識到這些保存模式沒有利用任何鎖,也不是原子的,這很重要。因此,如果有多個寫入者試圖往同一個地方寫入,這是不安全的。此外,當執行一個 Overwrite,在寫入新的數據之前會將原來的數據進行刪除。
eg:指定保存模式:
df.select(name , favorite_color).write().mode(SaveMode.Overwrite).save(namesAndFavColors.parquet
Scala/JavaAny LanguageMeaningSaveMode.ErrorIfExists(default) error (default) 當保存 DataFrame 到一個數據源,如果數據已經存在,將會引發異常。SaveMode.Append append
當保存 DataFrame 到一個數據源,如果數據 / 表已經存在,DataFrame 的內容預計將追加到現有數據后面。
SaveMode.Overwrite overwrite 覆蓋模式意味著當保存 DataFrame 到一個數據源,如果數據 / 表已存在,現有數據預計將覆蓋原有的 DataFrame 內容。SaveMode.Ignore ignore Ignore 模式意味著當向數據源中保存一個 DataFrame 時,如果數據已經存在,save 操作不會將 DataFrame 的內容進行保存,也不會修改已經存在的數據。這與 SQL 中的 `CREATE TABLE IF NOT EXISTS` 相似。
三:Parquet 文件
Parquet 是一種列式存儲格式的文件,被許多其他數據處理系統所支持。Spark SQL 支持度對 Parquet 文件的讀和寫,自動保存原有數據的模式。
到此,相信大家對“Spark Data Sources 怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!