spark更改分割槽 Spark中的分割槽方法詳解

2021-10-13 11:15:57 字數 2680 閱讀 5634

一、spark資料分割槽方式簡要

在spark中,rdd(resilient distributed dataset)是其最基本的抽象資料集,其中每個rdd是由若干個partition組成。在job執行期間,參與運算的partition資料分布在多台機器的記憶體當中。這裡可將rdd看成乙個非常大的陣列,其中partition是陣列中的每個元素,並且這些元素分布在多台機器中。圖一中,rdd1包含了5個partition,rdd2包含了3個partition,這些partition分布在4個節點中。

spark包含兩種資料分割槽方式:hashpartitioner(雜湊分割槽)和rangepartitioner(範圍分割槽)。一般而言,對於初始讀入的資料是不具有任何的資料分割槽方式的。資料分割槽方式只作用於形式的資料。因此,當乙個job包含shuffle操作型別的運算元時,如groupbykey,reducebykey etc,此時就會使用資料分割槽方式來對資料進行分割槽,即確定某乙個key對應的鍵值對資料分配到哪乙個partition中。在spark shuffle階段中,共分為shuffle write階段和shuffle read階段,其中在shuffle write階段中,shuffle map task對資料進行處理產生中間資料,然後再根據資料分割槽方式對中間資料進行分割槽。最終shffle read階段中的shuffle read task會拉取shuffle write階段中產生的並已經分好區的中間資料。圖2中描述了shuffle階段與partition關係。下面則分別介紹spark中存在的兩種資料分割槽方式。

二、hashpartitioner(雜湊分割槽)

1、hashpartitioner原理簡介

hashpartitioner採用雜湊的方式對鍵值對資料進行分割槽。其資料分割槽規則為 partitionid = key.hashcode % numpartitions,其中partitionid代表該key對應的鍵值對資料應當分配到的partition標識,key.hashcode表示該key的雜湊值,numpartitions表示包含的partition個數。圖3簡單描述了hashpartitioner的資料分割槽過程。

2、hashpartitioner原始碼詳解

hashpartitioner原始碼較為簡單,這裡不再進行詳細解釋。

classhashpartitioner(partitions: int) extends partitioner override def equals(other: any): boolean =other match override def hashcode: int =numpartitions

def nonnegativemod(x: int, mod: int): int=.collect()

val numitems=sketched.map(_._2).sum

(numitems, sketched)

④ 資料抽樣完成後,需要對不均衡的partition重新進行抽樣,預設當partition中包含的資料量大於平均值的三倍時,該partition是不均衡的。當取樣完成後,利用樣本容量和rdd中包含的資料總量,可以得到整體的乙個資料取樣率fraction。利用此取樣率對不均衡的partition呼叫sample運算元重新進行抽樣。

//計算資料取樣率

val fraction = math.min(samplesize / math.max(numitems, 1l), 1.0)//存放取樣key以及取樣權重

val candidates =arraybuffer.empty[(k, float)]//存放不均衡的partition

val imbalancedpartitions =mutable.set.empty[int]//(idx, n, sample)=> (partition id, 當前分割槽資料個數,當前partition的取樣資料)

sketched.foreach //在三倍之內的認為沒有發生資料傾斜

else//對於非均衡的partition,重新採用sample運算元進行抽樣

if(imbalancedpartitions.nonempty)

bounds.toarray

⑥ 計算每個key所在partition:當分割槽範圍長度在128以內,使用順序搜尋來確定key所在的partition,否則使用二分查詢演算法來確定key所在的partition。

* 獲得每個key所在的partitionid*/def getpartition(key: any): int=

}//範圍大於128,則進行二分搜尋該key所在範圍,即可得到該key所在的partitionid

elseif (partition >rangebounds.length) if(ascending) else

具體案例:對list裡面的單詞進行wordcount,並且輸出按照每個單詞的長度分割槽輸出到不同檔案裡面

classmypartitioner(val num:int) extends partitioner )//這裡指定自定義分割槽,然後輸出

println(rdd2.collect().tobuffer)

sc.stop()

結果:因為這裡定義的是4個partition 所以最後產生4個檔案

其中part-00000 和 part-00001如下:

其中part-00002 和 part-00003如下:

其中part-00000中zhangsan的長度對4取模為0和這個檔案中其他較短的單詞一樣,所以在乙個分割槽, part-00003沒有內容,說明上面的單詞的長度對4取模結果沒有為3的

spark更改分割槽 如何管理Spark的分割槽

當我們使用spark載入資料來源並進行一些列轉換時,spark會將資料拆分為多個分割槽partition,並在分割槽上並行執行計算。所以理解spark是如何對資料進行分割槽的以及何時需要手動調整spark的分割槽,可以幫助我們提公升spark程式的執行效率。什麼是分割槽 關於什麼是分割槽,其實沒有什...

spark分割槽器

spark的分割槽器 只有涉及到 key value 型別的rdd才會用到分割槽器,因為分割槽是以key分割槽的 spark中分割槽器直接決定了rdd中分割槽的個數 rdd中每條資料經過shuffle過程屬於哪個分割槽和reduce的個數。a hashpartitioner 預設分割槽器 hash分...

Spark中RDD分割槽以及節點

spark中rdd分割槽 對於二元rdd使用時,例如在使用join 時 我們對資料集是如何分割槽的卻一無所知。預設情況下,連線操作會將兩個資料集中的所有鍵的雜湊值都求出來,將該雜湊值相同的記錄通過網路傳到同一臺機器 上,然後在那台機器上對所有鍵相同的記錄進行連線操作,會非常消耗效能,如果乙個資料集設...