共計 9668 個字符,預計需要花費 25 分鐘才能閱讀完成。
這篇文章主要講解了“Spark 編程知識點有哪些”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“Spark 編程知識點有哪些”吧!
#Spark 開發指南 #
## 簡介 ## 總的來說,每一個 Spark 應用程序,都是由一個驅動程序組成,它運行用戶的 main 函數,并且在一個集群上執行各種各樣的并行操作。Spark 提供的主要的抽象(概念)是一個彈性分布式數據集,它是一個元素集合,劃分到集群的不同節點上,可以被并行操作。RDDs 的創建可以從 Hadoop 文件系統(或者任何支持 Hadoop 的文件系統)上的一個文件開始,或者通過轉換這個驅動程序中已存在的 Scala 集合而來。用戶也可以使 Spark 持久化一個 RDD 到內存中,使其能在并行操作中被有效的重用。最后,RDDs 能自動從節點故障中恢復。
Spark 中的第二個抽象(概念)是共享變量,他可以在并行操作中使用。默認情況下,Spark 通過不同節點上的一系列任務來并行運行一個函數。他將每一個函數中用的到變量的拷貝傳遞到每一個任務中。有時候,一個變量需要在不同的任務之間,或者任務和驅動程序之間共享。Spark 支持兩種類型的共享變量:廣播變量,可以再所有節點的內存中緩存一個值,累加器,一個只能做加法的變量,例如計數器和求和。
本指南通過每一種 Spark 支持的語言來展示 Spark 的每個特性。It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.
## 接入 Spark##
###Java###
Spark1.0.2 工作在 Java6 或者 java6 以后之上。如果你在使用 Java8,Spark 支持 lamdba 表達式來簡化函數編寫,否則,你可以使用 org.apache.spark.api.java.function 包下的類。
用 Java 編寫 Spark 應用,你需要添加 Spark 的依賴,Spark 可以通過 Maven Central 使用:
groupId=org.apache.spark artifactId=spark-core_2.10 version=1.0.2
另外,如果你想訪問一個 HDFS 集群,你需要根據你的 HDFS 版本添加一個 hadoop-client 依賴。一些常用的 HDFS 版本標簽顯示在頁面。
groupId=org.apache.hadoop artifactId=hadoop-client version= your-hdfs-version
最后,你需要在你的程序中導入一些 Spark 類,通過添加如下幾行:
import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.SparkConf
## 初始化 Spark##
###Java### Spark 程序需要做的第一件事就是創建一個 JavaSparkContext 對象,它將告訴 Spark 怎樣訪問一個集群。創建一個 SparkContext,你首先必須創建 SparkConf 對象,它包含關于你的應用程序的信息。
SparkConf conf=new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc=new JavaSparkContext(conf);
appName 參數是你的應用程序的名字,將會在集群的UI上顯示。master 是 Spark、Mesos、或者 YARN 集群 URL,或者一個專用的字符串”local“使其在本地模式下運行。在實踐中,當運行在一個集群上,你將不會想要把 master 硬編碼到程序中,而是通過使用 spark-submit 運行程序并且接受 master。但是,在本地測試或者單元測試中,你可以傳遞”local“在進程內運行 Spark。
## 彈性分布式數據集 ## Spark 反復圍繞的一個概念是彈性分布式數據集。它是一個有容錯機制的元素集合,并且可以被并行操作。有兩種創建 RDDs 的方法。并行化你的驅動程序中已存在的集合,或者引用一個外部存儲系統的數據集,例如一個共享文件系統,HDFS、HBase、或者任何可以提供一個 Hadoop InputFormat 的數據源。
### 并行集合 ### 并行集合通過調用 JavaSparkContext 的 parallelize 方法,在你的驅動程序中已存在的 Collection 上創建。集合的元素將會拷貝組成一個可以被并行操作的分布式數據集。例如,下面是如何創建一個包含數字 1 到 5 的并行集合:
List Integer data=Arrays.asList(1,2,3,4,5); JavaRDD Integer distData=sc.parallelize(data);
一旦創建,分布式數據集(distData)就可以并行操作。例如,我們可以調用 distData.reduce((a,b)- a+b) 來將列表中的元素相加。我們稍后將會在分布式數據集的操作中描述。
注意:在這個指南中,我們經常使用簡潔的 Java8 lamdba 語法來定義 java functions,但是在老的 Java 版本中,你可以實現 org.apache.spark.api.java.function 包中的接口。我們將會在下面詳細描述 passing functions to Spark。
并行集合的另一個重要的參數是數據集被切分成切片(slices)的數量。Spark 將會為集群中的每一個 slice 運行一個 task。通常情況下,你要為集群中的每個 CPU 2- 4 個 slice。通常,Spark 會嘗試根據你的集群自動設置 slice 的數量。然而,你可以手動的設置它,把它作為第二個參數傳遞給 parallelize(例如:sc.parallelize(data,10)).
### 外部數據集 ### Spark 可以通過任何 Hadoop 支持的存儲源創建分布式數據集。包括你的本地文件系統,HDFS,Cassandra,HBase,Amazon S3 等等。Spark 支持 text files(文本文件),SequenceFiles(序列化文件),和任何其他的 Hadoop InputFormat(輸入格式)。
Text file 可以通過使用 SparkContext 的 textFile 方式創建。這個方法接受一個文件的 URI(或者機器上的一個本地路徑, 或者 hdfs://,s3n:// 等 URI)并且把這個文件讀成一個行的集合。下面是一個調用的例子:
JavaRDD String distFile=sc.textFile(data.txt
一旦創建,distFile 可以被進行數據集操作。例如: 我們可以通過使用 map 和 reduce 將所有數據行的長度相加. 例如:distFile.map(s- s.length()).reduce((a,b)- (a+b)).
Spark 讀文件時的一些注意事項:
如果使用本地文件系統上的路徑,
Spark 的所有基于文件的輸入方法,包括 textFile, 支持運行目錄,壓縮文件盒通配符。例如,你可以食用 textFile(/my/directory/ ),textFile(/my/directory/.txt), 和 textFile(/my/directory/.gz)
textFile 方法也可以接受一個可選的第二參數來控制這個文件的 slice 數目。默認情況下,Spark 為每一個文件創建一個 slice(HDFS 中 block 默認為 64MB)。但是你可以通過傳遞一個較大的值來指定一個跟高的 slice 值。注意你的 slice 數不能小于 block 數。
除了文本文件,Spark 的 Java API 也支持集中其他數據格式。
JavaSparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
對于序列化文件(SequenceFiles),使用 SparkContext 的 sequenceFile[K,V],K 和 V 是文件中 key 和 value 的類型。他們必須是 Hadoop 的 Writeable 接口的子類,像 IntWriteable 和 Text。
對于其他的 Hadoop 輸入格式,你可以使用 JavaSparkContext.hadoopRDD 方法。它可以接受任意(類型)的 JobConf 和輸入格式類,key 類和 value 類。按照像 Hadoop Job 一樣,來設置輸入源就可以了。你也可以為 InputFormats 使用 JavaSparkContext.newHadoopRDD,基于”new“MapReduce API(org.apache.hadoop.mapreduce).
JavaRDD.saveAsObjectFile 和 JavaContext.objectFile 支持以一種由 Java 對象序列化組成的簡單的格式保存 RDD。雖然這不是有效地專門的格式向 Avro,但是它提供了一個簡單的方式存儲 RDD。
###RDD 操作 ### RDDs 支持兩種類型的操作:轉換(transformations),它從一個現有的數據集創建一個新的數據集。動作 (actions),它在數據集上運行計算后,返回一個值給驅動程序。例如:map 就是一個轉換,它將數據集的每一個元素傳遞給一個函數,并且返回一個新的 RDD 表示結果。另一方面,reduce 是一個動作,他通過一些行數將一些 RDD 的所有元素聚合起來,并把最終的結果返回給驅動程序(不過還有一個并行的 reduceByKey,它返回一個分布式數據集)。
Spark 中的所有轉換都是惰性的,也就是說,他們不會立即計算出結果。相反,他們只是記住應用到這些基礎數據集(例如 file)上的轉換。只有當發生一個需要返回一個結果給驅動程序的動作時,這些轉換才真正執行。這樣的設計使得 Spark 運行更加高效——例如,我們可以實現,通過 map 創建一個數據集,并在 reduce 中使用,最終只返回 reduce 的結果給驅動程序,而不是整個大的新數據集。
默認情況下,每一個轉換過的 RDD 都會在你在它上面運行一個 action 時重新計算。然而,你也可以使用 persist 方法(或者 cache)持久化一個 RDD 到內存中。在這種情況下,Spark 將會在集群中,保存相關元素,下次你訪問這個 RDD 時,它將能夠更快速訪問,。在磁盤上持久化數據集,或者在集群間復制數據集也是支持的。
#### 基本操作 #### 為了說明 RDD 基礎,考慮下面的簡單的程序:
JavaDDD String lines=sc.textFile(data.txtt JavaRDD Integer lineLengths=lines.map(s- s.length()); int totalLength=lineLengths.reduce((a,b)- a+b);
第一行通過一個外部文件定義了一個基本的 RDD。這個數據集未被加載到內存,也未在上面執行動作。lines 僅僅是這個文件的一個指針。第二行定義了 lineLengths 作為 map 轉換的結果。此外,lineLengths 因為惰性沒有立即計算。最后,我們運行 reduce,他是一個 action。這時候,Spark 將這個計算拆分成不同的 task,并使其運行在獨立的機器上,并且每臺機器運行它自己的 map 部分和本地的 reducation,僅僅返回他的結果給驅動程序。
如果我們想在以后重復使用 lineLengths,我們可以添加:
lineLengths.persist();
在 reduce 之前,這將導致 lineLengths 在第一次被計算之后被保存在內存中。
#### 傳遞 Functions 到 Spark#### Spark 的 API,在很大程度上依賴于傳遞函數使其驅動程序在集群上運行。在 Java 中,函數有實現了 org.apache.spark.api.java.function 包中接口的類表示。有兩種創建這樣的函數的方式:
在你自己的類中實現 Function 接口,可以是匿名內部類,后者命名類,并且你要傳遞他的一個實例到 Spark
在 Java8 中,使用 lamdba 表達式來簡潔的定義一種實現
為了簡潔起見,本指南中的大多數使用 lamdba 語法,它易于使用,所有的 APIs in long-form,例如,我們可以編寫上面的代碼如下:
JavaRDD String lines = sc.textFile( data.txt
JavaRDD Integer lineLengths = lines.map(new Function String, Integer () { public Integer call(String s) { return s.length(); }
int totalLength = lineLengths.reduce(new Function2 Integer, Integer, Integer () { public Integer call(Integer a, Integer b) { return a + b; }
});
或者,如果編寫內聯函數顯得很笨拙:
class GetLength implements Function String, Integer { public Integer call(String s) { return s.length(); }
class Sum implements Function2 Integer, Integer, Integer { public Integer call(Integer a, Integer b) { return a + b; }
JavaRDD String lines = sc.textFile( data.txt
JavaRDD Integer lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
Note that anonymous inner classes in Java can also access variables in the enclosing scope as long as they are marked final. Spark will ship copies of these variables to each worker node as it does for other languages
####Wroking with Key-Value Pairs 使用鍵 / 值對工作 #### 雖然大多數 Spark 操作工作在包含各種類型的對象的 RDDs 之上,一些特殊的操作僅僅能夠使用包含 key-value 對的 RDDs。最常見的操作之一是分布式”shuffle“操作,例如通過 key 分組或者聚合元素。
在 Java 中,key-value 對使用 scala 標準包下的 scala Tuple2 類表示。你可以簡單的調用 new Tuple2(a,b) 去創建一個 tuuple,并且通過 tuple._1() 和 tuple._2() 訪問它的字段。
key-value 對的 RDDs 通過 JavaPairRDD 表示。你可以通過 JavaRDDs 構建 JavaPairRDDs,使用指定的 map 操作版本,像 mapToPair 和 flatMapToPair。JavaPair 將不僅擁有標準 RDD 函數,并且有特殊的 key-value 函數。
例如,下面的代碼在 key-value 對上使用 reduceByKey 操作來計算在一個文件中每行文本出現的次數和。
JavaRDD String lines=sc.textFile(data.txt JavaPairRDD String,Integer pairs=lines.mapToPair(s- new Tuple2(s,1)) JavaPairRDD String,Integer counts=pairs.reduceByKey((a,b)- a+b);
我們也可以使用 counts.sortByKey(), 例如,按照字母順序排序這個鍵值對。并且最后調用 counts.collect() 作為一個對象數組返回給驅動程序。
注意:當使用自定義的對象作為 key-value 對操作的 key 時,你必須確保自定義 equals()方法伴隨著一個匹配的 hashCode()方法。有關詳情,參考 Object.hashCode() 文檔大綱中列出的規定。
#### 轉換 #### 下面的表格列出了 Spark 支持的常見的轉換。更多信息可以參考 RDD API 文檔和 pair RDD 函數文檔。
#### 動作 #### 下面的表格列出了 Spark 支持的常見的動作。更多信息可以參考 RDD API 文檔和 pair RDD 函數文檔。
###RDD 持久化 ### Spark 最重要的一個功能是在不同的操作間,持久化(或者緩存)一個數據集到內存中。當你持久化一個 RDD 時,每一個節點都把它計算的分片結果保存在內存中,并且在對此數據集(或者衍生出的數據集)進行其他動作時重用。這將使后續的動作變得更快(通過快 109 倍以上)。緩存是(Spark)迭代算法和快速交互使用的關鍵工具。
你可以使用 persist() 和 cache()方法來標記一個將要持久化的 RDD。第一次他被一個動作進行計算,他將會保留在這個節點的內存中。Spark 的緩存有容錯性 - 如果 RDD 的任何一個分區丟失了,他會通過使用最初創建的它轉換操作,自動重新計算。
此外,每一個持久化 RDD 可以使用不同的存儲級別存儲。允許你,例如,持久化數據集到磁盤,持久化數據集到內存作為序列化的 Java 對象(節省空間),跨節點復制,或者 store it off-heap in Tachyon。這些級別通過傳遞一個 StorageLevel 對象(Scala,Java,Python)到 persist()來設置。cache()方法是使用默認存儲級別的快捷方法,即 StorageLevel.MEMORY_ONLY(存儲反序列化對象到內存), 完整的存儲級別設置為:
Spark 也會在 shuffle 操作(例如,reduceByKey)中自動的持久化一些中間數據。甚至當用戶未調用 persist 方法。這樣做是為了阻止在進行 shuffle 操作時由于一個節點故障而重新計算整個輸入。我們依然推薦用戶在作為結果的 RDD 上調用 persist 如果想打算重用它。
#### 存儲級別的選擇 ####
#### 移除數據 #### Spark 自動監視每一個節點上的緩存使用,并且使用 LRU 方式刪除老的數據分區。如果你想手工的刪除 yige RDD 而不是等他自動從緩存中清除,使用 RDD.unpersist() 方法。
## 共享變量 ## 通常,當傳遞給 Spark 操作(例如 map 或者 reduce)的函數在遠程集群節點上運行時,它實際上操作的是這個函數使用到的所有變量的獨立拷貝。這些變量被拷貝到每一臺機器,并且在遠程機器上的對這些變量的所有更新都不會傳回給驅動程序。通常看來,在不同的任務之間讀寫變量是低效的。然而,Spark 還是為兩種常見的使用模式提供了兩種有限的共享變量:廣播變量和累加器。
### 廣播變量 ### 廣播變量允許程序員保存一個只讀的變量緩存在每一臺機器上,而不是每個任務都保存一份拷貝。它們可以這樣被使用,例如:以一種高效的方式給每一個節點一個大的輸入數據集。Spark 會嘗試使用一種高效的廣播算法來分配廣播變量,以減小通信的代價。
廣播變量通過調用 SparkContext.broadcast(v) 方法從變量 v 創建。廣播變量是一個 v 的包裝器。它的值可以通過調用 value 方法訪問。下面的代碼展示了這些:
Broadcast int[] broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
在廣播變量被創建后,它應該在集群上任何函數中代替 v 被使用,使 v 不再傳遞到這些節點上。此外,對象 v 在被廣播后不能被修改,這樣可以保證所有節點獲得的廣播變量的值是相同的(例如,這個變量在之后被傳遞到一個新的節點)。
### 累加器 ### 累加器是一種只能通過關聯操作進行”加“操作的變量。因此可以高效的支持并行計算。它們可以用于實現計數器(* 例如在 MapReduce 中)和求和。Spark 原生支持數字類型的累加器。開發者也可以自己添加新的支持類型。
一個累加器可以通過調用 SparkContext.accumulator(v) 方法從一個初始值 v 中創建。運行在集群上的任務,可以通過使用 add 方法或者 += 操作(在 Scala 和 Python 中)來給它加值。然而,他們不能讀取這個值。只有驅動程序可以使用 value 的方法來讀取累加器的值。
如下的代碼,展示了如何利用累加器,將一個數組里面的所有元素相加:
Accumulator Integer accum = sc.accumulator(0);
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x - accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
雖然這段代碼使用了內置支持 Integer 類型的累加器。但是開發者也可以通過實現 AccumulatorParam 創建自己的類型。AccumulatorParam 接口有兩個方法:zero 為你的數據類型提供一個 zero value ,addInPlace 將兩個值相加。例如,假設我們有一個向量類來表示數學向量,我們可以這樣寫:
class VectorAccumulatorParam implements AccumulatorParam Vector { public Vector zero(Vector initialValue) { return Vector.zeros(initialValue.size());
}
public Vector addInPlace(Vector v1, Vector v2) { v1.addInPlace(v2); return v1;
}
// Then, create an Accumulator of this type:
Accumulator Vector vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());
在 Java 中,Spark 也支持更通用的 Accumulable 接口來累加數據,他們的計算結果類型和相加的元素的類型不一樣(例如,收集同樣的元素構建一個 list)。
感謝各位的閱讀,以上就是“Spark 編程知識點有哪些”的內容了,經過本文的學習后,相信大家對 Spark 編程知識點有哪些這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!