(三)Spark學習系列

2021-07-11 12:34:57 字數 3414 閱讀 6548

本章節講一講spark的shuffle模組

shuffle模組作用是將若干node節點上面的資料重新分割,再劃分到不同的節點中, 也就是將上乙個stage中的各個task的中間結果整合起來,然後再重新分組,以供下乙個stage的task對它們做運算。原因就是spark的設計就是把相具有某種共同特徵的一類資料需要匯聚到乙個計算節點上面進行計算。   所以shuffle的過程就像是把牌收集到了一起,洗一下,重新再發給牌友一樣。

那麼這裡面的關鍵問題是:

1、乙個shuffletask的輸入來自於多個前置的task的匯聚,那麼,shuffletask上面的記憶體使用量會增加。

2、為了節約頻寬,可能需要將資料壓縮以後再傳

3、需要通過網路傳輸,序列化和反序列化也是個問題。

針對第1個問題,spark中的shuffle需要將資料匯聚,那麼進行資料匯聚的那個點的資料是來自於多個其他的節點的,所以,可能就會存在單個節點的記憶體不夠用的情況,這個時候,spark會將中間的資料寫到磁碟裡面。(*寫檔案然後又要讀檔案就會導致了spark執行時候效率變慢。)

spark為什麼要持久化shuffle的結果

task可以做到在記憶體中計算,某個過程出錯後,可以從頭開始。但是對於shuffle來說,一旦資料丟失了,需要從新計算這個結果,代價有點大,因為丟給shuffle的結果一般都是經過了好多個rdd後的結果。所以,shuffle被設計成為了可以持久化。

持久化的時機是shuffle中的任務計算完成後,這個過程其實是叫做shuffle write過程,從spark0.8以後,shuffle write 就被設計成為需要寫磁碟。

shuffle結果是怎麼樣傳送給下游的task的?

shuffle任務做完了以後,會通知scheduler.mapstatus,然後下游的task會通過這個mapstatus拉資料

如何確定shuffle後檔案的數量呢?

首先,writer通過shuffledependency#partition#numpartition獲得需要生成的partition的數量,這個partition的數量是和下游的task的數量是相等的。對於單個task說,task處理的資料應該被設計成為可以完全能夠載入記憶體,所以partition的大小的選擇要能夠滿足task能夠在記憶體中將任務做完。每個task處理乙個partition。

shuffle演算法

hashbasedshuffle

在spark中,shuffle的資料是沒有排序的,所以,在shuffle的過程中會快。不過,正是由於資料是沒有排序的,那麼就不曉得資料的上下界,其實有的時候就會有不必要的網路開銷。如果一開始就能夠保證資料是有序的,那麼後面的排序過程其實並沒有那麼多的時間開銷。所以,感覺這個排序不排序是個需要折衷的方案。在mapreduce中,資料是需要排序的,在goldfish中,資料也是預先進行排序的,在gf中,資料排序後的收益是顯著的。

需要注意的是,shuffle的結果只會寫到本地檔案系統,下游的task對應的task可能會來自於多台機器。

執行時候的圖如下:shuffle也不是單節點的shuffle(spark也不會那麼傻= =),多個shuffle任務共同做shuffle,然後後繼的task再匯聚自己想要的資料。

有的問題:

(1)、開啟檔案的數量 , 比如上圖中,檔案就有followingtask*shufftask數目 4*4 = 16個

(2)、開啟檔案讀取後記憶體占用

(3)、隨機讀取問題

除此之外,我想到的還有讀取檔案的排隊問題,導致了下層任務推進變慢。

為了結果shuffle過程中產生檔案過多的問題。在spark0.8.1中加入了shuffle consolidate files 機制。

shuffle consolidate files 機制

這個機制將屬於乙個core的同乙個partition的不同的shuffle task 輸出到同乙個 partition file中。圖示如下:

此外,除了hash based shuffle,還有sort based shuffle

sort based shuffle

hash based shuffle 的乙個問題是,好生成的中間檔案太多了。在sort based sort 中,同乙個shuffle map task的輸出會寫到乙個檔案裡面,同時,還會生成乙個index 檔案。reducer通過這個index檔案來讀取到自己所需要的輸入檔案。sort based shuffle按照partition的id對資料進行排序,同乙個partition的key是不會進行sort的。對於那種需要排序的操作,比如sortbykey是通過reducer來完成的。

那麼,這具體是怎麼做到的呢?

首先,spark為每個partition在記憶體裡面都建立了array,然後,將屬於某個array的k/v對插入到這個array。這麼看起來,與其說是排序,不如說就是在記憶體中對資料進行分組。

當某個array的記憶體超過閾值,就會將這個array的資料寫到磁碟,建立這個array的檔案,記錄partitionid,還有資料量。最後,將外部儲存的檔案進行歸併排序,最後生成乙個大檔案,然後再生成乙個index檔案,作為索引,索引每個partition的起始位置。個人理解其實這裡就有問題了,問題①乙個array的記憶體超過閾值,不能夠代表其他的array占用了很多空間,這個時候,記憶體可能還有很多的空間,因為其它的array可能沒啥資料。問題②如果乙個array就建立了乙個檔案,那麼這個sortbased在寫資料上面的效能又退化到了hash的方式其實,此外還有乙個是歸併排序的時間消耗問題。那麼spark之所以這樣做的話,應該就是開啟了檔案後然後就關閉,因為中間有時間是錯開的,那麼在某個時刻,開啟的檔案數可能並不多,還有就是array在記憶體中充當了乙個緩衝區的作用,這樣使得資料量是批量寫入的,這樣的話,可以加大寫檔案的效率,還有就是如果有的array並不超多閾值,實際上是不用寫檔案的。 對此我的改進措施是  寫到乙個大的檔案加index向量的方式,不過這樣做依然會有問題,那就是,隨著時間的推移,本來在乙個partition的資料,會被分散在兩個很遠的磁碟位置上面,這樣可能會增加磁碟尋道路的可能

如何計算出partition的數量?

計算的partition的個數依然是和following task 的數目是一樣的。

在following task 如何 在index中,如果確定哪些部分的資料是自己要處理的呢?

shuffle map task運算結果的處理

這個和(二)中的一樣。涉及到了executor和driver端對資料的處理。

spark學習系列

以spark原始碼為參照分析模式匹配及種類 graphx 核心理解 spark 核心排程理解 效能spark效能相關引數配置 搜狗實驗室 sogou labs 富貴有定數,學問則無定數。求一分,便得一分 關於博主 許鵬,花名 徽滬一郎,2000年畢業於南京郵電學院,現就業於愛立信上海,在udm部門從...

Spark系列 三 Spark的工作機制

什麼時候才能回歸到看 寫感想的日子呀 剛剛交完房租的我血槽已空。看了師妹關於spark報告的ppt,好懷念學生時代開組會的時光啊,雖然我已經離開學校不長不短兩個月,但我還是非常認真的翻閱了,並作為大自然的搬運工來搬運知識了。1 local,本地執行,通過多執行緒來實現平行計算。2 本地偽集群執行模式...

Spark學習系列一

1 spark 是什麼?spark是乙個快速的處理大規模資料的通用工具。它是乙個基於記憶體計算框架 包含核心元件 spark core 互動式查詢 spark sql 準實時流式計算 spark streaming 機器學習 spark mllib 圖計 算 spark graphx 2 spark...