共計 6085 個字符,預計需要花費 16 分鐘才能閱讀完成。
這篇文章主要介紹“Spark RDD 的創建方式及算子的使用方法是什么”,在日常操作中,相信很多人在 Spark RDD 的創建方式及算子的使用方法是什么問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark RDD 的創建方式及算子的使用方法是什么”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!
一:簡單了解 RDD 和 RDD 處理數據
RDD,全稱為 Resilient Distributed Datasets,是一個容錯的、并行的數據結構,可以讓用戶顯式地將數據存儲到磁盤和內存中,并能控制數據的分區。
RDD:Spark 的核心概念是 RDD (resilientdistributed dataset),指的是一個只讀的,可分區的分布式數據集,這個數據集的全部或部分可以緩存在內存中,在多次計算間重用。
RDD 本質上是一個內存數據集,在訪問 RDD 時,指針只會指向與操作相關的部分。例如存在一個面向列的數據結構,其中一個實現為 Int 的數組,另一個實現為 Float 的數組。如果只需要訪問 Int 字段,RDD 的指針可以只訪問 Int 數組,避免了對整個數據結構的掃描。
RDD 將操作分為兩類:transformation 與 action。無論執行了多少次 transformation 操作,RDD 都不會真正執行運算,只有當 action 操作被執行時,運算才會觸發。而在 RDD 的內部實現機制中,底層接口則是基于迭代器的,從而使得數據訪問變得更高效,也避免了大量中間結果對內存的消耗。
在實現時,RDD 針對 transformation 操作,都提供了對應的繼承自 RDD 的類型,例如 map 操作會返回 MappedRDD,而 flatMap 則返回 FlatMappedRDD。當我們執行 map 或 flatMap 操作時,不過是將當前 RDD 對象傳遞給對應的 RDD 對象而已。
注意:創建的 Maven 工程,以下是 pom.xml 中的依賴:
dependencies
dependency
groupId junit /groupId
artifactId junit /artifactId
version 4.12 /version
/dependency
dependency
groupId org.apache.spark /groupId
artifactId spark-core_2.10 /artifactId
version 1.6.1 /version
/dependency
dependency
groupId org.apache.hadoop /groupId
artifactId hadoop-client /artifactId
version 2.6.4 /version
/dependency
dependency
groupId org.apache.spark /groupId
artifactId spark-sql_2.10 /artifactId
version 1.6.1 /version
/dependency
/dependencies
二:從 Hadoop 文件系統(或與 Hadoop 兼容的其他持久化存儲系統,如 Hive,HBase)輸出(HDFS)創建。
eg: 求 HDFS 文件中內容所有行數據長度及總長度。
public class TestRDD1 {public static void main(String[] args) {createRDDFromHDFS();
private static void createRDDFromHDFS(){SparkConf conf = new SparkConf();
conf.set( spark.testing.memory , 269522560000
JavaSparkContext sc = new JavaSparkContext(local , Spark Test , conf);
System.out.println( sc );
JavaRDD String rdd = sc.textFile( hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt
JavaRDD Integer newRDD = rdd.map( new Function String,Integer (){
private static final long serialVersionUID = 1L;
public Integer call(String string) throws Exception {System.out.println( string + + string.length() );
return string.length();
System.out.println( newRDD.count() );
int length = newRDD.reduce( new Function2 Integer, Integer, Integer (){
private static final long serialVersionUID = 1L;
public Integer call(Integer int1, Integer int2) throws Exception {
return int1+int2;
System.out.println(總和 + length);
}
三:通過 parallelize 或 makeRDD 將單擊數據創建為分布式 RDD。
eg:求總和。
public class TestRDD2 {public static void main(String[] args) {createRDDFromSuperRDD();
* JavaSparkContext(String master, String appName, SparkConf conf)
* master - Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* appName - A name for your application, to display on the cluster web UI
* conf - a SparkConf object specifying other Spark parameters
* */
private static void createRDDFromSuperRDD(){SparkConf conf = new SparkConf();
conf.set( spark.testing.memory , 269522560000
JavaSparkContext sc = new JavaSparkContext(local , Spark Test , conf);
System.out.println( sc );
List Integer list = new ArrayList Integer
for( int i=1;i i++){list.add(i);
JavaRDD Integer rdd = sc.parallelize(list);
JavaRDD Integer newRDD = rdd.map( new Function Integer,Integer (){
private static final long serialVersionUID = 1L;
public Integer call(Integer int1) throws Exception {
return int1;
int count = newRDD.reduce( new Function2 Integer, Integer, Integer (){
private static final long serialVersionUID = 1L;
public Integer call(Integer int1, Integer int2) throws Exception {
return int1+int2;
System.out.println(總和 + count);
}
注意:上述兩段代碼中,在獲取 JavaSparkContext 的時候,是這樣寫的:
SparkConf conf = new SparkConf();
conf.set(spark.testing.memory , 269522560000 // 給 jvm 足夠的資源。
JavaSparkContext sc = new JavaSparkContext(local , Spark Test , conf);
而對于標記的加粗紅色部分,參照 API 如下:
JavaSparkContext(String master, String appName, SparkConf conf)
-master – Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
-appName – A name for your application, to display on the cluster web UI
-conf – a SparkConf object specifying other Spark parameters
對于 master,官網有詳細的介紹:
我這里寫的是 local,表示的是:
對于本地模式測試和單元測試,可以通過 local 在 spark 內運行程序。
******************************
另外寫的一段,對算子中一些基本方法的使用
參考學習:
RDD 算子分類: http://my.oschina.net/gently/blog/686800 (自己的。)
public class TestRDD3 {
private static String appName = Test Spark RDD
private static String master = local
public static void main(String[] args) {SparkConf conf = new SparkConf();
conf.set( spark.testing.memory , 269522560000
JavaSparkContext sc = new JavaSparkContext(master, appName, conf);
System.out.println( sc );
List String list = new ArrayList String
list.add( Berg );
list.add( Hadoop );
list.add( HBase );
list.add( Hive );
list.add( Spark );
JavaRDD String rdd = sc.parallelize(list);
JavaRDD Integer newrdd = rdd.map( new Function String,Integer (){
private static final long serialVersionUID = 1L;
public Integer call(String string) throws Exception {System.out.println( string + \t +string.length() );
return string.length();
Integer length = newrdd.reduce( new Function2 Integer, Integer, Integer () {
private static final long serialVersionUID = 1L;
public Integer call(Integer i1, Integer i2) throws Exception {
return i1+i2;
long count = newrdd.count();
List Integer listnewrdd = newrdd.collect();
for (Integer integer : listnewrdd) {System.out.print(integer + \t );
System.out.println( \nlength -- + length + + count );
System.out.println( \n\n**************************************\n\n
List Integer list1 = new ArrayList Integer
for( int i=1; i i++){list1.add( i );
JavaRDD Integer rdd1 = sc.parallelize(list1);
JavaRDD Integer unionrdd = newrdd.union(rdd1);
JavaRDD Integer rdd2 = unionrdd.map( new Function Integer,Integer (){
private static final long serialVersionUID = 1L;
public Integer call(Integer i) throws Exception {
return i;
long count2 = rdd2.reduce( new Function2 Integer, Integer, Integer () {
private static final long serialVersionUID = 1L;
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0 + arg1;
System.out.println(count2 -- +count2 );
rdd2.foreach( new VoidFunction Integer (){
private static final long serialVersionUID = 1L;
public void call(Integer arg0) throws Exception {System.out.println( foreach-- + arg0 );
}
到此,關于“Spark RDD 的創建方式及算子的使用方法是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!