Spark的Shuffle過程介紹

2021-08-08 07:39:05 字數 2048 閱讀 2511

spark的shuffle過程介紹

shuffle writer

spark豐富了任務型別,有些任務之間資料流轉不需要通過shuffle,但是有些任務之間還是需要通過shuffle來傳遞資料,比如wide dependency的group by key。

spark中需要shuffle輸出的map任務會為每個reduce建立對應的bucket,map產生的結果會根據設定的partitioner得到對應的bucketid,然後填充到相應的bucket中去。每個map的輸出結果可能包含所有的reduce所需要的資料,所以每個map會建立r個bucket(r是reduce的個數),m個map總共會建立m*r個bucket。

map建立的bucket其實對應磁碟上的乙個檔案,map的結果寫到每個bucket中其實就是寫到那個磁碟檔案中,這個檔案也被稱為blockfile,是disk block manager管理器通過檔名的hash值對應到本地目錄的子目錄中建立的。每個map要在節點上建立r個磁碟檔案用於結果輸出,map的結果是直接輸出到磁碟檔案上的,100kb的記憶體緩衝是用來建立fast buffered outputstream輸出流。這種方式乙個問題就是shuffle檔案過多。

針對上述shuffle過程產生的檔案過多問題,spark有另外一種改進的shuffle過程:consolidation shuffle,以期顯著減少shuffle檔案的數量。在consolidation shuffle中每個bucket並非對應乙個檔案,而是對應檔案中的乙個segment部分。job的map在某個節點上第一次執行,為每個reduce建立bucket對應的輸出檔案,把這些檔案組織成shufflefilegroup,當這次map執行完之後,這個shufflefilegroup可以釋放為下次迴圈利用;當又有map在這個節點上執行時,不需要建立新的bucket檔案,而是在上次的shufflefilegroup中取得已經建立的檔案繼續追加寫乙個segment;當前次map還沒執行完,shufflefilegroup還沒有釋放,這時如果有新的map在這個節點上執行,無法迴圈利用這個shufflefilegroup,而是只能建立新的bucket檔案組成新的shufflefilegroup來寫輸出。

比如乙個job有3個map和2個reduce:(1) 如果此時集群有3個節點有空槽,每個節點空閒了乙個core,則3個map會排程到這3個節點上執行,每個map都會建立2個shuffle檔案,總共建立6個shuffle檔案;(2) 如果此時集群有2個節點有空槽,每個節點空閒了乙個core,則2個map先排程到這2個節點上執行,每個map都會建立2個shuffle檔案,然後其中乙個節點執行完map之後又排程執行另乙個map,則這個map不會建立新的shuffle檔案,而是把結果輸出追加到之前map建立的shuffle檔案中;總共建立4個shuffle檔案;(3) 如果此時集群有2個節點有空槽,乙個節點有2個空core乙個節點有1個空core,則乙個節點排程2個map乙個節點排程1個map,排程2個map的節點上,乙個map建立了shuffle檔案,後面的map還是會建立新的shuffle檔案,因為上乙個map還正在寫,它建立的shufflefilegroup還沒有釋放;總共建立6個shuffle檔案。

shuffle fetcher

reduce去拖map的輸出資料,spark提供了兩套不同的拉取資料框架:通過socket連線去取資料;使用netty框架去取資料。

每個節點的executor會建立乙個blockmanager,其中會建立乙個blockmanagerworker用於響應請求。當reduce的get_block的請求過來時,讀取本地檔案將這個blockid的資料返回給reduce。如果使用的是netty框架,blockmanager會建立shufflesender用於傳送shuffle資料。

並不是所有的資料都是通過網路讀取,對於在本節點的map資料,reduce直接去磁碟上讀取而不再通過網路框架。

reduce拖過來資料之後以什麼方式儲存呢?spark map輸出的資料沒有經過排序,spark shuffle過來的資料也不會進行排序,spark認為shuffle過程中的排序不是必須的,並不是所有型別的reduce需要的資料都需要排序,強制地進行排序只會增加shuffle的負擔。reduce拖過來的資料會放在乙個hashmap中,hashmap中儲存的也是

Spark的Shuffle過程介紹

spark豐富了任務型別,有些任務之間資料流轉不需要通過shuffle,但是有些任務之間還是需要通過shuffle來傳遞資料,比如wide dependency的group by key。spark中需要shuffle輸出的map任務會為每個reduce建立對應的bucket,map產生的結果會根據...

Spark 的 Shuffle過程介紹

spark豐富了任務型別,有些任務之間資料流轉不需要通過shuffle,但是有些任務之間還是需要通過shuffle來傳遞資料,比如wide dependency的group by key。spark中需要shuffle輸出的map任務會為每個reduce建立對應的bucket,map產生的結果會根據...

hadoop和spark的shuffle異同點

spark 裡是shufflemaptask 的輸出進行 partition 不同的 partition 送到不同的 reducer spark 裡reducer 可能是下乙個 stage 裡的shufflemaptask 也可能是 resulttask reducer 以記憶體作緩衝區,邊 shu...