一文搞清楚 Spark 資料本地化級別

2021-09-19 09:37:10 字數 3672 閱讀 7106

文章首發於:大資料進擊之路

rdd 原始碼

大家可以看到原始碼中的第五條注釋說明,翻譯過來的大概意思是提供一系列的最佳計算位置。

我之前一直不太清楚 spark 是如何內部實現的,今天就帶領大家來看一看 spark 的本地資料化級別在任務執行中的演變過程。

spark 中任務的處理需要考慮資料的本地性,以 spark 1.6 為例,目前支援一下幾種。(中英文排版很頭疼,誰來幫幫我啊)

process_local程序本地化,表示 task 要計算的資料在同乙個 executor 中。

node_local節點本地化,速度稍慢,因為資料需要在不同的程序之間傳遞或從檔案中讀取。分為兩種情況,第一種:task 要計算的資料是在同乙個 worker 的不同 executor 程序中。第二種:task 要計算的資料是在同乙個 worker 的磁碟上,或在 hdfs 上恰好有 block 在同乙個節點上。如果 spark 要計算的資料**於 hdfsd 上,那麼最好的本地化級別就是 node_local。

no_pref沒有最佳位置,資料從哪訪問都一樣快,不需要位置優先。比如 spark sql 從 mysql 中讀取資料。

rack_local機架本地化,資料在同一機架的不同節點上。需要通過網路傳輸資料以及檔案 io,比 node_local 慢。情況一:task 計算的資料在 worker2 的 executor 中。情況二:task 計算的資料在 work2 的磁碟上。

any跨機架,資料在非同一機架的網路上,速度最慢。

如果不是很清楚,我畫(造)了一張圖放在這以供大家理解。

上面這段簡單的**,背後其實做什麼很多事情。driver 的 taskscheduler 在傳送 task 之前,首先應該拿到 rdd1 資料所在的位置,rdd1 封裝了這個檔案所對應的 block 的位置,dagscheduler 通過呼叫 getprerredlocations() 拿到 partition 所對應的資料的位置,taskscheduler 根據這些位置來傳送相應的 task。

具體的解釋:

dagscheduler 切割job,劃分stage, 通過呼叫 submitstage 來提交乙個stage 對應的 tasks,submitstage 會呼叫 submitmissingtasks, submitmissingtasks 確定每個需要計算的 task 的preferredlocations,通過呼叫 getpreferrdelocations() 得到 partition 的優先位置,就是這個 partition 對應的 task 的優先位置,對於要提交到 taskscheduler 的 taskset 中的每乙個task,該 task 優先位置與其對應的 partition 對應的優先位置一致。

taskscheduler 接收到了 taskset 後,taskschedulerimpl 會為每個 taskset 建立乙個 tasksetmanager 物件,該物件包含taskset 所有 tasks,並管理這些 tasks 的執行,其中就包括計算 tasksetmanager 中的 tasks 都有哪些 locality levels,以便在排程和延遲排程 tasks 時發揮作用。

總的來說,spark 中的資料本地化是由 dagscheduler 和 taskscheduler 共同負責的。

3計算節點與輸入資料位置的關係,下面以乙個圖來展開 spark 是如何讓進行排程的。這乙個過程會涉及 rdd, dagscheduler , taskscheduler。

**第一步:******process_local ****

taskscheduler 根據資料的位置向資料節點傳送 task 任務。如果這個任務在 worker1 的 executor 中等待了 3 秒。(預設的,可以通過spark.locality.wait 來設定),可以通過 sparkconf() 來修改,重試了 5 次之後,還是無法執行,taskscheduler 就會降低資料本地化的級別,從 process_local 降到 node_local。

第二步:**node_local

taskscheduler 重新傳送 task 到 worker1 中的 executor2 中執行,如果 task 在worker1 的 executor2 中等待了 3 秒,重試了 5 次,還是無法執行,taskscheduler 就會降低資料本地化的級別,從 node_local 降到 rack_local。

第三步:**rack_local

taskscheduler重新傳送 task 到 worker2 中的 executor1 中執行。

第四步:

當 task 分配完成之後,task 會通過所在的 worker 的 executor 中的 blockmanager 來獲取資料。如果 blockmanager 發現自己沒有資料,那麼它會呼叫 getremote() 方法,通過 connectionmanager 與原 task 所在節點的 blockmanager 中的 connectionmanager先建立連線,然後通過transferservice(網路傳輸元件)獲取資料,通過網路傳輸回task所在節點(這時候效能大幅下降,大量的網路io占用資源),計算後的結果返回給driver。這一步很像 shuffle 的檔案定址流程,spark 的 shuffle 檔案定址流程

4taskscheduler在傳送task的時候,會根據資料所在的節點傳送task,這時候的資料本地化的級別是最高的,如果這個task在這個executor中等待了三秒,重試發**5次還是依然無法執行,那麼taskscheduler就會認為這個executor的計算資源滿了,taskscheduler會降低一級資料本地化的級別,重新傳送task到其他的executor中執行,如果還是依然無法執行,那麼繼續降低資料本地化的級別…

如果想讓每乙個 task 都能拿到最好的資料本地化級別,那麼調優點就是等待時間加長。注意!如果過度調大等待時間,雖然為每乙個 task 都拿到了最好的資料本地化級別,但是我們 job 執行的時間也會隨之延長。

下面是官方提供的引數說明:

可以在**裡面這樣設定:

new sparkconf.set("spark.locality.wait","100")
如果對您有幫助,歡迎關注、**。

一次搞清楚ajax和axios fetch的區別

前端請求基於ajax的時代已逐漸成為將要成為歷史了,es6的fetch和node的axios將會逐漸代替它,本篇博文將就這三者的區別做以比較方便對後兩者的理解和使用。ajax ajax error function axios axios 是乙個基於 promise 的 http 庫,可以用在瀏覽器...

bat中幾個要搞清楚的東西(一)

bat中enabledelayedexpansion用來設定變數的延遲擴充套件,通過以下語句可以開啟延遲擴充套件,setlocal enabledelayedexpansion這樣設定後變數的擴充套件時間將發生在execution時而不是變數 parse時,可以通過下邊的例子看出其作用。echo o...

一文清楚MySQL邏輯架構

學習mysql如果在腦子裡對該資料庫系統的各個元件如何協作工作的流程很清晰的話,那麼肯定就會深入理解mysql伺服器,學習mysql是這樣,其他事情也是這樣。伺服器會事先生成執行緒池,每個客戶端連線伺服器都會在伺服器的程序中歸屬於乙個執行緒,客戶端只會在自己歸屬的執行緒中執行查詢操作,伺服器會負責快...