Spark效能調優(十)之Spark統一記憶體管理

2021-08-02 05:36:22 字數 3856 閱讀 5460

一:memory manager

在spark 1.6 版本中,memorymanager 的選擇是由spark.memory.uselegacymode=false決定的。如果採用1.6之前的模型,這會使用staticmemorymanager來管理,否則使用新的unifiedmemorymanager,我們先看看1.6之前,對於乙個executor,記憶體都有哪些部分構成:

1,executionmemory。這片記憶體區域是為了解決 shuffles,joins, sorts and aggregations 過程中為了避免頻繁io需要的buffer。 通過spark.shuffle.memoryfraction(預設 0.2) 配置。

2,storagememory。這片記憶體區域是為了解決 block cache(就是你顯示呼叫dd.cache, rdd.persist等方法), 還有就是broadcasts,以及task results的儲存。可以通過引數 spark.storage.memoryfraction(預設0.6)設定。

3,othermemory。給系統預留的,因為程式本身執行也是需要記憶體的。 (預設為0.2).

另外,為了防止oom,一般而言都會有個safetyfraction,比如executionmemory 真正的可用記憶體是spark.shuffle.memoryfraction * spark.shuffle.safetyfraction 也就是0.8 * 0.2 ,只有16%的記憶體可用。

這種記憶體分配機制,最大的問題是,誰都不能超過自己的上限,規定了是多少就是多少,雖然另外一片記憶體閒著呢。這在是storagememory 和 executionmemory比較嚴重,他們都是消耗記憶體的大戶。這個問題引出了unified memory management模型,重點是打破executionmemory 和 storagememory 這種分明的界限。

二:othermemory

other memory在1.6也做了調整,保證至少有300m可用。你也可以手動設定 spark.testing.reservedmemory . 然後把實際可用記憶體減去這個reservedmemory得到 usablememory。 executionmemory 和 storagememory 會共享usablememory * 0.75的記憶體。0.75可以通過 新引數 spark.memory.fraction 設定。目前spark.memory.storagefraction 預設值是0.5,所以executionmemory,storagememory預設情況是均分上面提到的可用記憶體的。

三:unifiedmemorymanager

這個類提供了兩個核心的方法:

acquireexecutionmemory

acquirestoragememory

1,acquireexecutionmemory

每次申請executionmemory 的時候,都會呼叫 maybegrowexecutionpool方法,基於該方法我們可以得到幾個有意義的結論:

1.如果executionmemory 記憶體充足,則不會觸發向storage申請記憶體

2.每個task能夠被使用的記憶體被限制在 poolsize / (2 * numactivetasks) ~ maxpoolsize / numactivetasks 之間。

maxpoolsize = maxmemory - math.min(storagememoryused, storageregionsize)

poolsize = executionmemorypool.poolsize (當前executionmemorypool 所持有的記憶體)

3.如果executionmemory 的記憶體不足,則會觸發向storagememory索引要記憶體的操作。

如果executionmemory 的記憶體不足,則會向 storagememory要記憶體,具體怎麼樣呢? 看下面一句**就懂了:

val memoryreclaimablefromstorage = math.max(storagememorypool.memoryfree, storagememorypool.poolsize - storageregionsize)
看storagememorypool的剩餘記憶體和 storagememorypool 從executionmemory借來的記憶體那個大,取最大的那個,作為可以重新歸還的最大記憶體。用公式表達出來就是這乙個樣子:

executionmemory 能借到的最大記憶體= storagememory 借的記憶體 + storagememory 空閒記憶體。

當然,如果實際需要的小於能夠借到的最大值,則以實際需要值為準。下面的**體現了這個邏輯:

val spacereclaimed = storagememorypool.shrinkpooltofreespace(  

math.min(extramemoryneeded,memoryreclaimablefromstorage))

onheapexecutionmemorypool.incrementpoolsize(spacereclaimed)

2,acquirestoragememory

流程和acquireexecutionmemory類似,但是區別是,當且僅當executionmemory有空閒記憶體時,storagememory 才能借走該記憶體。這個邏輯體現在這行**上:

val memoryborrowedfromexecution = math.min(onheapexecutionmemorypool.memoryfree, numbytes)
所以storagememory從executionmemory借走的記憶體,完全取決於當時executionmemory是不是有空閒記憶體。

四:memorypool

前面講的是storagememory和executionmemory的互動。現在記憶體的具體表示則是由 memorypool完成的。

unifiedmemorymanage 維護了三個物件:

@guardedby("this")

protected val storagememorypool = new storagememorypool(this)

@guardedby("this")

protected val onheapexecutionmemorypool = new executionmemorypool(this, "on-heap execution")

@guardedby("this")

protected val offheapexecutionmemorypool = new executionmemorypool(this, "off-heap execution")

真實的記憶體計數其實都是由這幾個物件來完成的。比如

1.記憶體的借出借入

2.task目前記憶體的使用跟蹤

值的注意的是,我們以前知道,系統shuffle的時候,是可以使用in-heap /off-heap 記憶體的。在unifiedmemorymanage中,用了不同的物件來追蹤。如果你開啟的是offheapexecutionmemorypool,則不存在和storagememory的互動,也就沒有動態記憶體的概念了。

總結:1,理論上可以減少shuffle spill數,極端情況可能中間就沒有spill過程了,可以大大減少io次數。

2,如果你的記憶體太緊張,可能無法緩解問題。

3,如果你的程式具有偏向性,比如重度exectionmemory 或者storagememory 的某乙個,則可能會帶來比較好的效果。

Spark效能調優之Shuffle調優總結

spark底層shuffle的傳輸方式是使用netty傳輸,netty在進行網路傳輸的過程會申請堆外記憶體 netty是零拷貝 所以使用了堆外記憶體。shuffle過程中常出現的問題 常見問題一 reduce oom?問題原因 reduce task 去map端獲取資料,reduce一邊拉取資料一邊...

Spark效能調優 之 運算元調優(二)

map 表示每乙個元素 rrd.foreache 表示每乙個元素 rrd.forpartitions 表示每個分割槽的資料組成的迭代器 在生產環境中,通常使用foreachpartition運算元來完成資料庫的寫入,通過foreachpartition運算元的特性,可以優化寫資料庫的效能。如果使用f...

spark 效能調優

核心調優引數如下 num executors executor memory executor cores driver memory spark.default.parallelizm spark.storage.memoryfraction spark.shuffle.memoryfractio...