久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Spark RDD的創建方式及算子的使用方法是什么

167次閱讀
沒有評論

共計 6085 個字符,預計需要花費 16 分鐘才能閱讀完成。

這篇文章主要介紹“Spark RDD 的創建方式及算子的使用方法是什么”,在日常操作中,相信很多人在 Spark RDD 的創建方式及算子的使用方法是什么問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark RDD 的創建方式及算子的使用方法是什么”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

一:簡單了解 RDD 和 RDD 處理數據

 RDD,全稱為 Resilient Distributed Datasets,是一個容錯的、并行的數據結構,可以讓用戶顯式地將數據存儲到磁盤和內存中,并能控制數據的分區。

 RDD:Spark 的核心概念是 RDD (resilientdistributed dataset),指的是一個只讀的,可分區的分布式數據集,這個數據集的全部或部分可以緩存在內存中,在多次計算間重用。

 RDD 本質上是一個內存數據集,在訪問 RDD 時,指針只會指向與操作相關的部分。例如存在一個面向列的數據結構,其中一個實現為 Int 的數組,另一個實現為 Float 的數組。如果只需要訪問 Int 字段,RDD 的指針可以只訪問 Int 數組,避免了對整個數據結構的掃描。

 RDD 將操作分為兩類:transformation 與 action。無論執行了多少次 transformation 操作,RDD 都不會真正執行運算,只有當 action 操作被執行時,運算才會觸發。而在 RDD 的內部實現機制中,底層接口則是基于迭代器的,從而使得數據訪問變得更高效,也避免了大量中間結果對內存的消耗。

  在實現時,RDD 針對 transformation 操作,都提供了對應的繼承自 RDD 的類型,例如 map 操作會返回 MappedRDD,而 flatMap 則返回 FlatMappedRDD。當我們執行 map 或 flatMap 操作時,不過是將當前 RDD 對象傳遞給對應的 RDD 對象而已。

注意:創建的 Maven 工程,以下是 pom.xml 中的依賴:

dependencies 
 dependency 
 groupId junit /groupId 
 artifactId junit /artifactId 
 version 4.12 /version 
 /dependency 
 dependency 
 groupId org.apache.spark /groupId 
 artifactId spark-core_2.10 /artifactId 
 version 1.6.1 /version 
 /dependency 
 dependency 
 groupId org.apache.hadoop /groupId 
 artifactId hadoop-client /artifactId 
 version 2.6.4 /version 
 /dependency 
 dependency 
 groupId org.apache.spark /groupId 
 artifactId spark-sql_2.10 /artifactId 
 version 1.6.1 /version 
 /dependency 
 /dependencies

二:從 Hadoop 文件系統(或與 Hadoop 兼容的其他持久化存儲系統,如 Hive,HBase)輸出(HDFS)創建。

    eg:  求 HDFS 文件中內容所有行數據長度及總長度。

public class TestRDD1 {public static void main(String[] args) {createRDDFromHDFS();
 private static void createRDDFromHDFS(){SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(local ,  Spark Test , conf);
 System.out.println( sc );
 JavaRDD String  rdd = sc.textFile( hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt 
 JavaRDD Integer  newRDD = rdd.map( new Function String,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(String string) throws Exception {System.out.println( string +     + string.length() );
 return string.length();
 System.out.println( newRDD.count() );
 int length = newRDD.reduce( new Function2 Integer, Integer, Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer int1, Integer int2) throws Exception {
 return int1+int2;
 System.out.println(總和  + length);
}

三:通過 parallelize 或 makeRDD 將單擊數據創建為分布式 RDD。

eg:求總和。

public class TestRDD2 {public static void main(String[] args) {createRDDFromSuperRDD();
  * JavaSparkContext(String master, String appName, SparkConf conf)
  * master - Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
  * appName - A name for your application, to display on the cluster web UI
  * conf - a SparkConf object specifying other Spark parameters
  * */
 private static void createRDDFromSuperRDD(){SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(local ,  Spark Test , conf);
 System.out.println( sc );
 List Integer  list = new ArrayList Integer 
 for( int i=1;i i++){list.add(i);
 JavaRDD Integer  rdd = sc.parallelize(list);
 JavaRDD Integer  newRDD = rdd.map( new Function Integer,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer int1) throws Exception {
 return int1;
 int count = newRDD.reduce( new Function2 Integer, Integer, Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer int1, Integer int2) throws Exception {
 return int1+int2;
 System.out.println(總和  + count);
}

注意:上述兩段代碼中,在獲取 JavaSparkContext 的時候,是這樣寫的:

 SparkConf conf = new SparkConf();

     conf.set(spark.testing.memory , 269522560000  // 給 jvm 足夠的資源。

     JavaSparkContext sc = new JavaSparkContext(local , Spark Test , conf);

而對于標記的加粗紅色部分,參照 API 如下:

 JavaSparkContext(String master, String appName, SparkConf conf)

 -master – Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
 -appName – A name for your application, to display on the cluster web UI
 -conf – a SparkConf object specifying other Spark parameters

對于 master,官網有詳細的介紹:

我這里寫的是 local,表示的是:

    對于本地模式測試和單元測試,可以通過 local 在 spark 內運行程序。

******************************

另外寫的一段,對算子中一些基本方法的使用

參考學習:

    RDD 算子分類: http://my.oschina.net/gently/blog/686800 (自己的。)

public class TestRDD3 {
 private static String appName =  Test Spark RDD 
 private static String master =  local 
 public static void main(String[] args) {SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(master, appName, conf);
 System.out.println( sc );
 List String  list = new ArrayList String 
 list.add(  Berg  );
 list.add(  Hadoop  );
 list.add(  HBase  );
 list.add(  Hive  );
 list.add(  Spark  );
 JavaRDD String  rdd = sc.parallelize(list);
 JavaRDD Integer  newrdd = rdd.map( new Function String,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(String string) throws Exception {System.out.println( string +  \t  +string.length() );
 return string.length();
 Integer length = newrdd.reduce( new Function2 Integer, Integer, Integer () {
 private static final long serialVersionUID = 1L;
 public Integer call(Integer i1, Integer i2) throws Exception {
 return i1+i2;
 long count = newrdd.count();
 List Integer  listnewrdd = newrdd.collect();
 for (Integer integer : listnewrdd) {System.out.print(integer +   \t  );
 System.out.println(  \nlength --    + length +     + count );
 System.out.println(  \n\n**************************************\n\n 
 List Integer  list1 = new ArrayList Integer 
 for( int i=1; i i++){list1.add( i );
 JavaRDD Integer  rdd1 = sc.parallelize(list1);
 JavaRDD Integer  unionrdd = newrdd.union(rdd1);
 JavaRDD Integer  rdd2 = unionrdd.map( new Function Integer,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer i) throws Exception {
 return i;
 long count2 = rdd2.reduce( new Function2 Integer, Integer, Integer () {
 private static final long serialVersionUID = 1L;
 public Integer call(Integer arg0, Integer arg1) throws Exception {
 return arg0 + arg1;
 System.out.println(count2 --    +count2 );
 rdd2.foreach( new VoidFunction Integer (){
 private static final long serialVersionUID = 1L;
 public void call(Integer arg0) throws Exception {System.out.println(  foreach--    + arg0 );
}

到此,關于“Spark RDD 的創建方式及算子的使用方法是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計6085字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 嘉善县| 阳城县| 犍为县| 甘南县| 高碑店市| 萨迦县| 石狮市| 灵武市| 苍南县| 石景山区| 池州市| 陆良县| 哈巴河县| 阳曲县| 广丰县| 松江区| 永兴县| 当涂县| 上栗县| 曲靖市| 溧水县| 什邡市| 兴安县| 富川| 上犹县| 浙江省| 和田市| 阿拉尔市| 自贡市| 黄陵县| 荥经县| 丹巴县| 绵竹市| 怀柔区| 邻水| 枝江市| 迭部县| 嘉祥县| 黄浦区| 凤山县| 蕉岭县|