spark調優 shuffle調優

2021-08-21 13:20:48 字數 2039 閱讀 2975

每乙個shuffle的前半部分stage的task,每個task都會建立下乙個stage的task數量相同的檔案,比如下乙個stage會有100個task,那麼當前stage每個task都會建立100份檔案,會將同乙個key對應的values,一定是寫入同乙個檔案中的,也一定會將同乙個key對應的values寫入下乙個stage,同乙個task對應的檔案中。

shuffle的後半部分stage的task,每個task都會從各個節點上的task寫的屬於自己的那乙份檔案中,拉取key,value對;然後task會有乙個記憶體快取區,然後用hashmap然後進行key-values進行聚合(key,values);

task 會用我們自己定義的聚合函式,進行聚合

shuffle,一定是分為倆個stage來完成的,因為這其實是個逆向的過程,不是stage決定shuffle,是shuffle決定stage

reducebykey(_ + _) 在某個action觸發job的時候,dagscheduler,會負責劃分job為多個stage.,劃分的依據,就是發現有會觸發shuffle操作的運算元,比如reducebykey,就將這個操作的前半部分,以及以前所有的rdd和transformation操作,劃分為乙個stage,

優化一:合併map端輸出檔案

new sparkconf().set("spark.shuffle.consolidatefiles","true")

開啟shufflemap端輸出檔案合併的機制,預設是不開啟的,就會發生下邊大量map端輸出檔案的操作,消耗大量的效能

如果不合併map端輸出檔案的話,會怎麼樣?

問題來了,預設的這種shuffle行為,對效能有什麼樣的惡略影響呢?

實際生產環境的條件:

100個節點,每個節點100個executor,:100個executor

每個executor:2個cpu core

總共1000個task,每個executor平均10個task  每個task輸出下個stage的task數量檔案

每個節點,10個task,會輸出多少分map端檔案 10 * 1000 = 1萬個檔案

總共有多少份map端輸出檔案?100 * 10000 = 100 萬

shuffle中的寫磁碟的操作,基本上就是shuffle中效能消耗最為嚴重的部分。

通過上面的分析,乙個普通的生產環境的spark job的乙個shuffle環節,會寫入磁碟100萬個檔案

磁碟io對效能和spark作業執行速度的影響,是及其驚人和嚇人的。

基本上,spark作業的效能,都消耗在shuffle中了,雖然不只是shuffle的map端輸出檔案這乙個部分,但是這裡也是非常大的乙個效能消耗點

開啟map端輸出檔案的合併機制後

第乙個stage,同時就執行cpu core個task,比如cpu core是2個,那麼就並行執行2個task;每個task都建立下乙個stage的task數量個檔案;

第乙個stage,並行執行的倆個task,執行完以後就會執行另外倆個task,另外2個task不會再建立輸出檔案;而是復用之前的task建立的map端輸出檔案,將資料寫入上一批task的輸出檔案中。

第二個stage,task在拉取資料的時候,就不會去拉取上乙個stage每個task為自己建立的那份輸出檔案了,而是拉取少量的輸出檔案,每個輸出檔案中,可能包含了多個task給自己的map端輸出。

提醒一下(map端輸出檔案合併)

只有並行執行的task回去建立新的輸出檔案;下一批 並行執行的task,就會去復用之前已有的輸出檔案,但是有乙個例外,比如2個task並行在執行,此時又啟動要執行2個task,就無法去復用剛才那倆個task建立的輸出檔案了,而是自己去建立新的輸出檔案

要實現輸出檔案合併的效果,必須是一批task先執行,然後下一批task再執行,才能復用之前的輸出檔案,否則多批task同時起來執行,還是做不到復用。

合併map端輸出檔案,對咱們的spark的效能有哪些方面的影響呢?

1. map task寫入磁碟檔案的io,減少:100萬 --> 20 萬

2. 第二個stage,原來要拉取第乙個stage的task數量份檔案,1000個task,第二stage都要拉取1000份檔案,要走網路傳輸

spark調優 shuffle調優

基於spark1.6 引數可以通過 new sparkcontext set 來設定,也可以通過命令的引數設定 conf spark.shuffle.file.buffer 預設值 32k 引數說明 該引數用於設定shuffle write task的bufferedoutputstream的buf...

七 Spark效能調優 Shuffle 調優

目錄 一 調節 map 端緩衝區大小 二 調節 reduce 端拉取資料緩衝區大小 三 調節 reduce 端拉取資料重試次數 四 調節 reduce 端拉取資料等待間隔 五 調節 sortshuffle 排序操作閾值 val conf new sparkconf set spark.shuffle...

Spark效能調優之Shuffle調優總結

spark底層shuffle的傳輸方式是使用netty傳輸,netty在進行網路傳輸的過程會申請堆外記憶體 netty是零拷貝 所以使用了堆外記憶體。shuffle過程中常出現的問題 常見問題一 reduce oom?問題原因 reduce task 去map端獲取資料,reduce一邊拉取資料一邊...