久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Spark SQL編程的示例分析

151次閱讀
沒有評論

共計 5995 個字符,預計需要花費 15 分鐘才能閱讀完成。

這篇文章將為大家詳細講解有關 Spark SQL 編程的示例分析,丸趣 TV 小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

#Spark SQL 編程指南 #

## 簡介 ## Spark SQL 支持在 Spark 中執行 SQL,或者 HiveQL 的關系查詢表達式。它的核心組件是一個新增的 RDD 類型 JavaSchemaRDD。JavaSchemaRDD 由 Row 對象和表述這個行的每一列的數據類型的 schema 組成。一個 JavaSchemaRDD 類似于傳統關系數據庫的一個表。JavaSchemaRDD 可以通過一個已存在的 RDD,Parquet 文件,JSON 數據集,或者通過運行 HiveSQL 獲得存儲在 Apache Hive 上的數據創建。

Spark SQL 目前是一個 alpha 組件。盡管我們會盡量減少 API 變化,但是一些 API 任然后再以后的發布中改變。

## 入門 ## 在 Spark 中,所有關系函數功能的入口點是 JavaSQLContext 類。或者他的子類。要創建一個基本的 JavaSQLContext,所有你需要的只是一個 JavaSparkContext。

JavaSparkContext sc = ...; // An existing JavaSparkContext.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);

## 數據源 ## Spark SQL 支持通過 JavaSchemaRDD 接口操作各種各樣的數據源。一單一個數據集被加載,它可以被注冊成一個表,甚至和來自其他源的數據連接。

###RDDs### Spark SQL 支持的表的其中一個類型是由 JavaBeans 的 RDD。BeanInfo 定義了這個表的 schema。現在,Spark SQL 不支持包括嵌套或者復雜類型例如 Lists 或者 Arrays 的 JavaBeans。你可以通過創建一個實現了 Serializable 并且它的所有字段都有 getters 和 setters 方法的類類創建一個 JavaBeans。

public static class Person implements Serializable {
 private String name;
 private int age;
 public String getName() {
 return name;
 }
 public void setName(String name) {
 this.name = name;
 }
 public int getAge() {
 return age;
 }
 public void setAge(int age) {
 this.age = age;
 }
}

一個 schema 可以被應用在一個已存在的 RDD 上,通過調用 applySchema 并且提供這個 JavaBean 的類對象。

// sc is an existing JavaSparkContext.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc)
// Load a text file and convert each line to a JavaBean.
JavaRDD Person  people = sc.textFile(examples/src/main/resources/people.txt).map( new Function String, Person () { public Person call(String line) throws Exception { String[] parts = line.split( , 
 Person person = new Person();
 person.setName(parts[0]);
 person.setAge(Integer.parseInt(parts[1].trim()));
 return person;
 }
 });
// Apply a schema to an RDD of JavaBeans and register it as a table.
JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
schemaPeople.registerAsTable( people 
// SQL can be run over RDDs that have been registered as tables.
JavaSchemaRDD teenagers = sqlContext.sql(SELECT name FROM people WHERE age  = 13 AND age  = 19)
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List String  teenagerNames = teenagers.map(new Function Row, String () { public String call(Row row) { return  Name:   + row.getString(0);
 }
}).collect();

注意,Spark SQL 目前使用一個非常簡單的 SQL 解析器。用戶如果想獲得一個更加完整的 SQL 方言,應該看看 HiveContext 提供的 HiveQL 支持。

###Parquet Files### Parquet 是一個 columnar 格式,并且被許多其他數據處理系統支持。Spark SQL 對讀寫 Parquet 文件提供支持,并且自動保存原始數據的 Schema。通過下面的例子使用數據:

// sqlContext from the previous example is used in this example.
JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.
// JavaSchemaRDDs can be saved as Parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile( people.parquet 
// Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
JavaSchemaRDD parquetFile = sqlContext.parquetFile( people.parquet 
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable( parquetFile 
JavaSchemaRDD teenagers = sqlContext.sql( SELECT name FROM parquetFile WHERE age  = 13 AND age  = 19 
List String  teenagerNames = teenagers.map(new Function Row, String () { public String call(Row row) { return  Name:   + row.getString(0);
 }
}).collect();

###JSON Datasets### Spark SQL 可以自動推斷一個 JSON 數據集的 schema,并加載成一個 JavaSchemaRDD。這個轉換可以通過 JavaSQLContext 中的兩個方法中的一個完成:

jsonFile - 從一個目錄下的文件中加載數據,這個文件中的每一行都是一個 JSON 對象。

jsonRdd - 從一個已存在的 RDD 加載數據,這個 RDD 中的每一個元素是一個包含一個 JSON 對象的 String。

 // sc is an existing JavaSparkContext.
 JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
 // A JSON dataset is pointed to by path.
 // The path can be either a single text file or a directory storing text files.
 String path =  examples/src/main/resources/people.json 
 // Create a JavaSchemaRDD from the file(s) pointed to by path
 JavaSchemaRDD people = sqlContext.jsonFile(path);
 // The inferred schema can be visualized using the printSchema() method.
 people.printSchema();
 // root
 // |-- age: IntegerType
 // |-- name: StringType
 // Register this JavaSchemaRDD as a table.
 people.registerAsTable( people 
 // SQL statements can be run by using the sql methods provided by sqlContext.
 JavaSchemaRDD teenagers = sqlContext.sql( SELECT name FROM people WHERE age  = 13 AND age  = 19 
 // Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
 // an RDD[String] storing one JSON object per string.
 List String  jsonData = Arrays.asList(  {\ name\ :\ Yin\ ,\ address\ :{\ city\ :\ Columbus\ ,\ state\ :\ Ohio\}} 
 JavaRDD String  anotherPeopleRDD = sc.parallelize(jsonData);
 JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);

###Hive Tables### Spark SQL 也支持讀和寫存儲在 apache Hive 中的數據。然而,由于 Hive 有一個非常大的依賴,他沒有在 Spark 默認寶中包括。為了使用 Hive,你必須運行‘SPARK_HIVE=true sbt/sbt assembly/assembly (或者對 Maven 使用 -Phive)。這個命令構建一個包含 Hive 的 assembly。注意,這個 Hive assembly 必須放在所有的工作節點上,因為它們需要訪問 Hive 的序列化和方序列化包(SerDes),以此訪問存儲在 Hive 中的數據。

可以通過 conf 目錄下的 hive-site.xml 文件完成 Hive 配置。

要和 Hive 配合工作,你需要構造一個 JavaHiveContext,它繼承了 JavaSQLContext,并且添加了發現 MetaStore 中的表和使用 HiveQL 編寫查詢的功能。此外,除了 sql 方法,JavaHiveContext 方法還提供了一個 hql 方法,它允許查詢使用 HiveQL 表達。

##Writing Language-Integrated Relational Queries## Language-Integrated 查詢目前只在 Scala 中被支持。

Spark SQL 同樣支持使用領域特定的語言來編寫查詢。再次,使用上面例子中的數據:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import sqlContext._
val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
// The following is the same as  SELECT name FROM people WHERE age  = 10 AND age  = 19 
val teenagers = people.where(age  = 10).where(age  = 19).select(name)
teenagers.map(t =   Name:   + t(0)).collect().foreach(println)

DSL 使用 Scala 中得到標記來表示基礎表中的表,他們使用一個前綴’標識。隱式轉換這些標記為被 SQL 執行引擎評估的表達式。支持這些功能的完成列表可以再 ScalaDoc 找到。

關于“Spark SQL 編程的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計5995字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 丽水市| 彭阳县| 宜宾市| 射洪县| 珲春市| 施甸县| 美姑县| 青海省| 深圳市| 四川省| 靖江市| 运城市| 桃源县| 定兴县| 怀集县| 雷波县| 临城县| 象山县| 贵定县| 奇台县| 延安市| 恭城| 呼伦贝尔市| 高雄市| 察哈| 长白| 固原市| 志丹县| 于田县| 兴海县| 江津市| 河南省| 东阳市| 乃东县| 内黄县| 孟连| 合肥市| 连南| 泗洪县| 达日县| 沈阳市|