本地提交spark Spark 資料本地化級別

2021-10-14 10:35:48 字數 3529 閱讀 5455

​rdd 原始碼

大家可以看到原始碼中的第五條注釋說明,翻譯過來的大概意思是提供一系列的最佳計算位置。

我之前一直不太清楚 spark 是如何內部實現的,今天就帶領大家來看一看 spark 的本地資料化級別在任務執行中的演變過程。

spark 中任務的處理需要考慮資料的本地性,以 spark 1.6 為例,目前支援一下幾種。(中英文排版很頭疼,誰來幫幫我啊)

process_local程序本地化,表示 task 要計算的資料在同乙個 executor 中。

node_local節點本地化,速度稍慢,因為資料需要在不同的程序之間傳遞或從檔案中讀取。分為兩種情況,第一種:task 要計算的資料是在同乙個 worker 的不同 executor 程序中。第二種:task 要計算的資料是在同乙個 worker 的磁碟上,或在 hdfs 上恰好有 block 在同乙個節點上。如果 spark 要計算的資料**於 hdfsd 上,那麼最好的本地化級別就是 node_local。

no_pref沒有最佳位置,資料從哪訪問都一樣快,不需要位置優先。比如 spark sql 從 mysql 中讀取資料。

rack_local機架本地化,資料在同一機架的不同節點上。需要通過網路傳輸資料以及檔案 io,比 node_local 慢。情況一:task 計算的資料在 worker2 的 executor 中。情況二:task 計算的資料在 work2 的磁碟上。

any跨機架,資料在非同一機架的網路上,速度最慢。

如果不是很清楚,我畫(造)了一張圖放在這以供大家理解。

val rdd1 = sc.textfile("hdfs://tsl...") rdd1.cache()rdd1.map.filter.count()
上面這段簡單的**,背後其實做什麼很多事情。driver 的 taskscheduler 在傳送 task 之前,首先應該拿到 rdd1 資料所在的位置,rdd1 封裝了這個檔案所對應的 block 的位置,dagscheduler 通過呼叫 getprerredlocations() 拿到 partition 所對應的資料的位置,taskscheduler 根據這些位置來傳送相應的 task。

具體的解釋:

dagscheduler 切割job,劃分stage, 通過呼叫 submitstage 來提交乙個stage 對應的 tasks,submitstage 會呼叫 submitmissingtasks, submitmissingtasks 確定每個需要計算的 task 的preferredlocations,通過呼叫 getpreferrdelocations() 得到 partition 的優先位置,就是這個 partition 對應的 task 的優先位置,對於要提交到 taskscheduler 的 taskset 中的每乙個task,該 task 優先位置與其對應的 partition 對應的優先位置一致。

taskscheduler 接收到了 taskset 後,taskschedulerimpl 會為每個 taskset 建立乙個 tasksetmanager 物件,該物件包含taskset 所有 tasks,並管理這些 tasks 的執行,其中就包括計算 tasksetmanager 中的 tasks 都有哪些 locality levels,以便在排程和延遲排程 tasks 時發揮作用。

總的來說,spark 中的資料本地化是由 dagscheduler 和 taskscheduler 共同負責的。

計算節點與輸入資料位置的關係,下面以乙個圖來展開 spark 是如何讓進行排程的。這乙個過程會涉及 rdd, dagscheduler , taskscheduler。

第一步:process_local

taskscheduler 根據資料的位置向資料節點傳送 task 任務。如果這個任務在 worker1 的 executor 中等待了 3 秒。(預設的,可以通過spark.locality.wait 來設定),可以通過 sparkconf() 來修改,重試了 5 次之後,還是無法執行,taskscheduler 就會降低資料本地化的級別,從 process_local 降到 node_local。

第二步:node_local

taskscheduler 重新傳送 task 到 worker1 中的 executor2 中執行,如果 task 在worker1 的 executor2 中等待了 3 秒,重試了 5 次,還是無法執行,taskscheduler 就會降低資料本地化的級別,從 node_local 降到 rack_local。

第三步:rack_local

taskscheduler重新傳送 task 到 worker2 中的 executor1 中執行。

第四步:

當 task 分配完成之後,task 會通過所在的 worker 的 executor 中的 blockmanager 來獲取資料。如果 blockmanager 發現自己沒有資料,那麼它會呼叫 getremote() 方法,通過 connectionmanager 與原 task 所在節點的 blockmanager 中的 connectionmanager先建立連線,然後通過transferservice(網路傳輸元件)獲取資料,通過網路傳輸回task所在節點(這時候效能大幅下降,大量的網路io占用資源),計算後的結果返回給driver。這一步很像 shuffle 的檔案定址流程,spark 的 shuffle 檔案定址流程

taskscheduler在傳送task的時候,會根據資料所在的節點傳送task,這時候的資料本地化的級別是最高的,如果這個task在這個executor中等待了三秒,重試發**5次還是依然無法執行,那麼taskscheduler就會認為這個executor的計算資源滿了,taskscheduler會降低一級資料本地化的級別,重新傳送task到其他的executor中執行,如果還是依然無法執行,那麼繼續降低資料本地化的級別...

如果想讓每乙個 task 都能拿到最好的資料本地化級別,那麼調優點就是等待時間加長。注意!如果過度調大等待時間,雖然為每乙個 task 都拿到了最好的資料本地化級別,但是我們 job 執行的時間也會隨之延長。

下面是官方提供的引數說明:

​可以在**裡面這樣設定:

new sparkconf.set("spark.locality.wait

Git 本地提交

1.本地增加檔案 git add filename 2.本地刪除檔案 git rm filename git rm r dirname 和rm 的區別是,如果使用rm刪除會將刪除該檔案的操作提交上去 直觀的來講,git rm 刪除過的檔案,執行 git commit m abc 提交時,會自動將刪除...

Git提交本地專案

我們向遠端倉庫提交專案有兩種情況,一種是遠端倉庫新建了專案,從本地轉殖下來後再我們的專案放到轉殖下來的資料夾中,但是這樣會多一層目錄 另一種情況就是直接將本地專案推到遠端倉庫,也就是遠端倉庫不要新建空的專案資料夾,第二種情況操作有點複雜 1.cd到本地專案資料夾下 2.git init 初始化 3....

git 忽略本地提交

使用git命令git update index assume unchanged file 表示忽略跟蹤如果沒有什麼提示資訊,恭喜你,成功了,這時候再提交就發現不想提交的檔案不見了 如果提示你錯誤fatal unable to mark file file這時候可以把 file新增到你的.gitig...