久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何進行Apache Spark源碼分析Job的提交與運行

157次閱讀
沒有評論

共計 2959 個字符,預計需要花費 8 分鐘才能閱讀完成。

如何進行 Apache Spark 源碼分析 Job 的提交與運行,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

下面以 wordCount 為例,詳細說明 spark 創建和運行 job 的過程,重點是在進程及線程的創建。

實驗環境搭建

在進行后續操作前,確保下列條件已滿足。

1. 下載 spark binary 0.9.1

2. 安裝 scala

3. 安裝 sbt

4. 安裝 java

啟動 spark-shell 單機模式運行,即 local 模式

local 模式運行非常簡單,只要運行以下命令即可,假設當前目錄是 $SPARK_HOME

MASTER=local bin/spark-shell

MASTER=local 就是表明當前運行在單機模式

local cluster 方式運行

localcluster 模式是一種偽 cluster 模式,在單機環境下模擬 standalone 的集群,啟動順序分別如下

1. 啟動 master

2. 啟動 worker

3. 啟動 spark-shell

master$SPARK_HOME/sbin/start-master.sh

注意運行時的輸出,日志默認保存在 $SPARK_HOME/logs 目錄。

master 主要是運行類  org.apache.spark.deploy.master.Master,在 8080 端口啟動監聽,日志如下圖所示

修改配置

1. 進入 $SPARK_HOME/conf 目錄

2. 將 spark-env.sh.template 重命名為 spark-env.sh

3. 修改 spark-env.sh,添加如下內容

export SPARK_MASTER_IP=localhostexport SPARK_LOCAL_IP=localhost 運行 workerbin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1 -c 1 -m 512M

worker 啟動完成,連接到 master。打開 maser 的 webui 可以看到連接上來的 worker. Master WEb UI 的監聽地址是 http://localhost:8080

啟動 spark-shellMASTER=spark://localhost:7077 bin/spark-shell

如果一切順利,將看到下面的提示信息。

Created spark context..Spark context available as sc.

可以用瀏覽器打開 localhost:4040 來查看如下內容

1. stages

2. storage

3. environment

4. executors

wordcount

上述環境準備妥當之后,我們在 sparkshell 中運行一下最簡單的例子,在 spark-shell 中輸入如下代碼

scala sc.textFile(README.md).filter(_.contains( Spark)).count

上述代碼統計在 README.md 中含有 Spark 的行數有多少

部署過程詳解

Spark 布置環境中組件構成如下圖所示。

 

Driver Program  簡要來說在 spark-shell 中輸入的 wordcount 語句對應于上圖的 Driver Program.

Cluster Manager  就是對應于上面提到的 master,主要起到 deploy management 的作用

Worker Node  與 Master 相比,這是 slave node。上面運行各個 executor,executor 可以對應于線程。executor 處理兩種基本的業務邏輯,一種就是 driver   programme, 另一種就是 job 在提交之后拆分成各個 stage,每個 stage 可以運行一到多個 task

Notes:  在集群 (cluster) 方式下, Cluster Manager 運行在一個 jvm 進程之中,而 worker 運行在另一個 jvm 進程中。在 local cluster 中,這些 jvm 進程都在同一臺機器中,如果是真正的 standalone 或 Mesos 及 Yarn 集群,worker 與 master 或分布于不同的主機之上。

JOB 的生成和運行

job 生成的簡單流程如下

1. 首先應用程序創建 SparkContext 的實例,如實例為 sc

2. 利用 SparkContext 的實例來創建生成 RDD

3. 經過一連串的 transformation 操作,原始的 RDD 轉換成為其它類型的 RDD

4. 當 action 作用于轉換之后 RDD 時,會調用 SparkContext 的 runJob 方法

5. sc.runJob 的調用是后面一連串反應的起點,關鍵性的躍變就發生在此處

調用路徑大致如下

1. sc.runJob- dagScheduler.runJob- submitJob

2. DAGScheduler::submitJob 會創建 JobSummitted 的 event 發送給內嵌類 eventProcessActor

3. eventProcessActor 在接收到 JobSubmmitted 之后調用 processEvent 處理函數

4. job 到 stage 的轉換,生成 finalStage 并提交運行,關鍵是調用 submitStage

5. 在 submitStage 中會計算 stage 之間的依賴關系,依賴關系分為寬依賴和窄依賴兩種

6. 如果計算中發現當前的 stage 沒有任何依賴或者所有的依賴都已經準備完畢,則提交 task

7. 提交 task 是調用函數 submitMissingTasks 來完成

8. task 真正運行在哪個 worker 上面是由 TaskScheduler 來管理,也就是上面的 submitMissingTasks 會調用 TaskScheduler::submitTasks

9. TaskSchedulerImpl 中會根據 Spark 的當前運行模式來創建相應的 backend, 如果是在單機運行則創建 LocalBackend

10. LocalBackend 收到 TaskSchedulerImpl 傳遞進來的 ReceiveOffers 事件

11. receiveOffers- executor.launchTask- TaskRunner.run

代碼片段 executor.lauchTask

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {   val tr = new TaskRunner(context, taskId, serializedTask)    runningTasks.put(taskId, tr)    threadPool.execute(tr) }

說了這么一大通,也就是講最終的邏輯處理切切實實是發生在 TaskRunner 這么一個 executor 之內。

運算結果是包裝成為 MapStatus 然后通過一系列的內部消息傳遞,反饋到 DAGScheduler,這一個消息傳遞路徑不是過于復雜。

看完上述內容,你們掌握如何進行 Apache Spark 源碼分析 Job 的提交與運行 的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注丸趣 TV 行業資訊頻道,感謝各位的閱讀!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計2959字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 炉霍县| 新化县| 灵璧县| 朝阳县| 南岸区| 枝江市| 柳江县| 湛江市| 安陆市| 石首市| 鹿泉市| 翁源县| 贵港市| 湟源县| 合川市| 论坛| 滕州市| 宝鸡市| 桓台县| 乐昌市| 柯坪县| 邯郸县| 启东市| 临武县| 青河县| 温泉县| 南和县| 屏东县| 富川| 镇沅| 阿拉善右旗| 和硕县| 陆河县| 永川市| 祁门县| 开化县| 镇宁| 芦溪县| 灵璧县| 措勤县| 宁南县|