spark shuffle 相關細節整理

2022-03-01 21:07:52 字數 2316 閱讀 6892

1.shuffle write 和shuffle read具體發生在**

2.**用到了partitioner

3.何為mapsidecombine

4.何時進行排序

之前已經看過spark shuffle原始碼了,現在總結一下一些之前沒有理解的小知識點,作為乙個總結。

使用者自定義的partitioner存到了**?

假設使用者在呼叫reducebykey時,傳遞了乙個自定義的partitioner,那麼,這個partitioner會被儲存到shufflerdd的shuffledependency中。在進行shuffle write時,會使用這個partitioner來對finalrdd.iterator(partition)的計算結果shuffle到不同的bucket中。

何為mapsidecombine

reducebykey預設是開啟了mapsidecombine的,在進行shuffle write時會進行本地聚合,在shuffle read時,也會合併一下。舉乙個例子更好:

shuffle write階段:

partition0:[(hello,1),(hello,1)]

partition1:[(hello,1),(word,1),(word,1)]

mapsidecombine後:

partition0:[(hello,2)]

partition1:[(hello,1),(word,2)]

hash shuffle後:

[(hello,2),(hello,1)]

[(word,2)]

hash read階段:

[(hello,3)]

[(word,2)]

何時排序

排序操作發生在shuffle read 階段。在shuffle read 進行完mapsidecombine之後,就開始進行排序了。

reducebykey做了什麼?

假設我們對rdd1呼叫了reducebykey,那麼最終的rdd依賴關係如下:rdd1->shufflerdd。rdd1.reducebykey中,會做如下非常重要的事情:建立shufflerdd,在建立shufflerdd的過程中最最最重要的就是會建立shuffledependency,這個shuffledependency中有aggregator,partitioner,ordering,parentrdd,mapsidecombine等重要的資訊。為什麼說shuffledependency非常重要,因為他是溝通shuffle writer和shuffle reader的乙個重要橋梁。

shuffle write

shuffle write 發生在shufflemaptask.runtask中。首先反序列出rdd1和那個shuffledependency:(rdd1,dep),然後呼叫rdd1.iterator(partition)獲取計算結果,再對計算結果進行shufflewriter,**如下:

override def runtask(context: taskcontext): mapstatus = 

catch

} catch

throw

e }

}

我們以hashsufflewriter為例,在其write(),他就會用到mapsidecombine和partitioner。如下:

/**

write a bunch of records to this task's output

*/override def write(records: iterator[product2[k, v]]): unit =

else

} else

for (elem <-iter)

}

shuffle read

shuffle read發生在shufflerdd的compute中:

override def compute(split: partition, context: taskcontext): iterator[(k, c)] =

下面是hashshufflereader的read():

/**

read the combined key-values for this reduce task

*/override def read(): iterator[product2[k, c]] =

else

} else

//sort the output if there is a sort ordering defined.

dep.keyordering match

}

Spark Shuffle記憶體分析

分布式系統裡的shuffle 階段往往是非常複雜的,而且分支條件也多,我只能按著我關注的線去描述。肯定會有不少謬誤之處,我會根據自己理解的深入,不斷更新這篇文章。前言用spark寫程式,乙個比較蛋疼的地方就是oom,或者gc嚴重,導致程式響應緩慢,一般這種情況都會出現在shuffle階段。shuff...

Spark shuffle流程細則

hadoop中的shuffle存在map任務和reduce任務之間,而spark中的shuffle過程存在stage之間。shuffle操作分為兩種,分別是寫操作和讀操作。基於排序的shuffle操作 基於雜湊的shuffle操作會產生很多檔案,這對檔案系統來說是乙個非誠大的負擔,而且在總資料量不大...

感知機相關難點細解

感知機 1.感知機是一種線性分類模型,而且只針對二分類問題。如果對於一組二分類資料其不能找到乙個超平面將所有資料正確劃分,那麼感知機模型將不可收斂。2感知機定義模型表示式 w為權重向量,b為偏置,x是輸入向量,也就是乙個樣本的特徵向量。f x 的結果即為x表示的樣本 的類別,結果為 1或 1.sig...