久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何使用Spark Shell進行交互式分析

181次閱讀
沒有評論

共計 5000 個字符,預計需要花費 13 分鐘才能閱讀完成。

本文丸趣 TV 小編為大家詳細介紹“如何使用 Spark Shell 進行交互式分析”,內容詳細,步驟清晰,細節處理妥當,希望這篇“如何使用 Spark Shell 進行交互式分析”文章能幫助大家解決疑惑,下面跟著丸趣 TV 小編的思路慢慢深入,一起來學習新知識吧。

基礎

Spark shell 提供了一種來學習該 API 比較簡單的方式, 以及一個強大的來分析數據交互的工具。在 Scala(運行于 Java 虛擬機之上, 并能很好的調用已存在的 Java 類庫)或者 Python 中它是可用的。通過在 Spark 目錄中運行以下的命令來啟動它:

Scala

Python

./bin/spark-shell

Spark 的主要抽象是一個稱為 Dataset 的分布式的 item 集合。Datasets 可以從 Hadoop 的 InputFormats(例如 HDFS 文件)或者通過其它的 Datasets 轉換來創建。讓我們從 Spark 源目錄中的 README 文件來創建一個新的 Dataset:

scala  val textFile = spark.read.textFile(README.md)
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

您可以直接從 Dataset 中獲取 values(值), 通過調用一些 actions(動作), 或者 transform(轉換)Dataset 以獲得一個新的。更多細節, 請參閱  API doc。

scala  textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala  textFile.first() // First item in this Dataset
res1: String = # Apache Spark

現在讓我們 transform 這個 Dataset 以獲得一個新的。我們調用  filter  以返回一個新的 Dataset, 它是文件中的 items 的一個子集。

scala  val linesWithSpark = textFile.filter(line =  line.contains( Spark))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

我們可以鏈式操作 transformation(轉換)和 action(動作):

scala  textFile.filter(line =  line.contains( Spark)).count() // How many lines contain  Spark ?
res3: Long = 15

Dataset 上的更多操作

Dataset actions(操作)和 transformations(轉換)可以用于更復雜的計算。例如, 統計出現次數最多的單詞 :

Scala

Python

scala  textFile.map(line =  line.split(  ).size).reduce((a, b) =  if (a   b) a else b)
res4: Long = 15

第一個 map 操作創建一個新的 Dataset, 將一行數據 map 為一個整型值。在 Dataset 上調用  reduce  來找到最大的行計數。參數  map  與  reduce  是 Scala 函數(closures), 并且可以使用 Scala/Java 庫的任何語言特性。例如, 我們可以很容易地調用函數聲明, 我們將定義一個 max 函數來使代碼更易于理解 :

scala  import java.lang.Math
import java.lang.Math
scala  textFile.map(line =  line.split(  ).size).reduce((a, b) =  Math.max(a, b))
res5: Int = 15

一種常見的數據流模式是被 Hadoop 所推廣的 MapReduce。Spark 可以很容易實現 MapReduce:

scala  val wordCounts = textFile.flatMap(line =  line.split(  )).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

在這里, 我們調用了  flatMap  以 transform 一個 lines 的 Dataset 為一個 words 的 Dataset, 然后結合  groupByKey  和  count  來計算文件中每個單詞的 counts 作為一個 (String, Long) 的 Dataset pairs。要在 shell 中收集 word counts, 我們可以調用  collect:

scala  wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

緩存

Spark 還支持 Pulling(拉取)數據集到一個群集范圍的內存緩存中。例如當查詢一個小的“hot”數據集或運行一個像 PageRANK 這樣的迭代算法時, 在數據被重復訪問時是非常高效的。舉一個簡單的例子, 讓我們標記我們的  linesWithSpark  數據集到緩存中:

Scala

Python

scala  linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala  linesWithSpark.count()
res8: Long = 15
scala  linesWithSpark.count()
res9: Long = 15

使用 Spark 來探索和緩存一個 100 行的文本文件看起來比較愚蠢。有趣的是, 即使在他們跨越幾十或者幾百個節點時, 這些相同的函數也可以用于非常大的數據集。您也可以像   編程指南. 中描述的一樣通過連接  bin/spark-shell  到集群中, 使用交互式的方式來做這件事情。

獨立的應用

假設我們希望使用 Spark API 來創建一個獨立的應用程序。我們在 Scala(SBT), Java(Maven)和 Python 中練習一個簡單應用程序。

Scala

Java

Python

我們將在 Scala 中創建一個非常簡單的 Spark 應用程序 – 很簡單的, 事實上, 它名為  SimpleApp.scala:

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp { def main(args: Array[String]) {
 val logFile =  YOUR_SPARK_HOME/README.md  // Should be some file on your system
 val spark = SparkSession.builder.appName(Simple Application).getOrCreate()
 val logData = spark.read.textFile(logFile).cache()
 val numAs = logData.filter(line =  line.contains( a)).count()
 val numBs = logData.filter(line =  line.contains( b)).count()
 println(s Lines with a: $numAs, Lines with b: $numBs)
 spark.stop()
 }
}

注意, 這個應用程序我們應該定義一個  main()  方法而不是去擴展  scala.App。使用  scala.App  的子類可能不會正常運行。

該程序僅僅統計了 Spark README 文件中每一行包含‘a’的數量和包含‘b’的數量。注意, 您需要將 YOUR_SPARK_HOME 替換為您 Spark 安裝的位置。不像先前使用 spark shell 操作的示例, 它們初始化了它們自己的 SparkContext, 我們初始化了一個 SparkContext 作為應用程序的一部分。

我們調用  SparkSession.builder  以構造一個 [[SparkSession]], 然后設置 application name(應用名稱), 最終調用  getOrCreate  以獲得 [[SparkSession]] 實例。

我們的應用依賴了 Spark API, 所以我們將包含一個名為  build.sbt  的 sbt 配置文件, 它描述了 Spark 的依賴。該文件也會添加一個 Spark 依賴的 repository:

name :=  Simple Project 
version :=  1.0 
scalaVersion :=  2.11.8 
libraryDependencies +=  org.apache.spark  %%  spark-sql  %  2.2.0

為了讓 sbt 正常的運行, 我們需要根據經典的目錄結構來布局  SimpleApp.scala  和  build.sbt  文件。在成功后, 我們可以創建一個包含應用程序代碼的 JAR 包, 然后使用  spark-submit  腳本來運行我們的程序。

# Your directory layout should look like this
$ find .
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
 --class  SimpleApp  \
 --master local[4] \
 target/scala-2.11/simple-project_2.11-1.0.jar
Lines with a: 46, Lines with b: 23

快速跳轉

恭喜您成功的運行了您的第一個 Spark 應用程序!

更多 API 的深入概述, 從  RDD programming guide  和  SQL programming guide  這里開始, 或者看看“編程指南”菜單中的其它組件。

為了在集群上運行應用程序, 請前往  deployment overview.

最后, 在 Spark 的  examples  目錄中包含了一些 (Scala, Java, Python, R) 示例。您可以按照如下方式來運行它們:

#  針對  Scala  和  Java,  使用  run-example:
./bin/run-example SparkPi
#  針對  Python  示例,  直接使用  spark-submit:
./bin/spark-submit examples/src/main/python/pi.py
#  針對  R  示例,  直接使用  spark-submit:
./bin/spark-submit examples/src/main/r/dataframe.R

讀到這里,這篇“如何使用 Spark Shell 進行交互式分析”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注丸趣 TV 行業資訊頻道。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-04發表,共計5000字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 仙游县| 扎赉特旗| 嫩江县| 泾川县| 三门县| 柘城县| 平原县| 永吉县| 黔东| 广州市| 德州市| 景宁| 隆安县| 鹤岗市| 台中县| 始兴县| 祥云县| 延吉市| 越西县| 阿合奇县| 定陶县| 璧山县| 平安县| 阜宁县| 通许县| 溧阳市| 秭归县| 马尔康县| 柳州市| 邵武市| 集贤县| 木兰县| 阿荣旗| 若羌县| 基隆市| 新郑市| 正定县| 高州市| 大庆市| 萨嘎县| 十堰市|