Spark面對OOM問題的解決方法及優化總結

2021-09-11 18:04:57 字數 4726 閱讀 3688

記憶體溢位解決方法:

1. map過程產生大量物件導致記憶體溢位:

這種溢位的原因是在單個map中產生了大量的物件導致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.tostring),這個操作在rdd中,每個物件都產生了10000個物件,這肯定很容易產生記憶體溢位的問題。針對這種問題,在不增加記憶體的情況下,可以通過減少每個task的大小,以便達到每個task即使產生大量的物件executor的記憶體也能夠裝得下。具體做法可以在會產生大量物件的map操作之前呼叫repartition方法,分割槽成更小的塊傳入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.tostring)。

面對這種問題注意,不能使用rdd.coalesce方法,這個方法只能減少分割槽,不能增加分割槽,不會有shuffle的過程。

2.資料不平衡導致記憶體溢位:

資料不平衡除了有可能導致記憶體溢位外,也有可能導致效能的問題,解決方法和上面說的類似,就是呼叫repartition重新分割槽。這裡就不再累贅了。

3.coalesce呼叫導致記憶體溢位:

這是我最近才遇到的乙個問題,因為hdfs中不適合存小問題,所以spark計算後如果產生的檔案太小,我們會呼叫coalesce合併檔案再存入hdfs中。但是這會導致乙個問題,例如在coalesce之前有100個檔案,這也意味著能夠有100個task,現在呼叫coalesce(10),最後只產生10個檔案,因為coalesce並不是shuffle操作,這意味著coalesce並不是按照我原本想的那樣先執行100個task,再將task的執行結果合併成10個,而是從頭到位只有10個task在執行,原本100個檔案是分開執行的,現在每個task同時一次讀取10個檔案,使用的記憶體是原來的10倍,這導致了oom。解決這個問題的方法是令程式按照我們想的先執行100個task再將結果合併成10個檔案,這個問題同樣可以通過repartition解決,呼叫repartition(10),因為這就有乙個shuffle的過程,shuffle前後是兩個stage,乙個100個分割槽,乙個是10個分割槽,就能按照我們的想法執行。

4.shuffle後記憶體溢位:

shuffle記憶體溢位的情況可以說都是shuffle後,單個檔案過大導致的。在spark中,join,reducebykey這一型別的過程,都會有shuffle的過程,在shuffle的使用,需要傳入乙個partitioner,大部分spark中的shuffle操作,預設的partitioner都是hashpatitioner,預設值是父rdd中最大的分割槽數,這個引數通過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism引數只對hashpartitioner有效,所以如果是別的partitioner或者自己實現的partitioner就不能使用spark.default.parallelism這個引數來控制shuffle的併發量了。如果是別的partitioner導致的shuffle記憶體溢位,就需要從partitioner的**增加partitions的數量。

5. standalone模式下資源分配不均勻導致記憶體溢位:

在standalone的模式下如果配置了--total-executor-cores 和 --executor-memory 這兩個引數,但是沒有配置--executor-cores這個引數的話,就有可能導致,每個executor的memory是一樣的,但是cores的數量不同,那麼在cores數量多的executor中,由於能夠同時執行多個task,就容易導致記憶體溢位的情況。這種情況的解決方法就是同時配置--executor-cores或者spark.executor.cores引數,確保executor資源分配均勻。

6.在rdd中,共用物件能夠減少oom的情況:

這個比較特殊,這裡說記錄一下,遇到過一種情況,類似這樣rdd.flatmap(x=>for(i <- 1 to 1000) yield ("key","value"))導致oom,但是在同樣的情況下,使用rdd.flatmap(x=>for(i <- 1 to 1000) yield "key"+"value")就不會有oom的問題,這是因為每次("key","value")都產生乙個tuple物件,而"key"+"value",不管多少個,都只有乙個物件,指向常量池。具體測試如下:

這個例子說明("key","value")和("key","value")在記憶體中是存在不同位置的,也就是存了兩份,但是"key"+"value"雖然出現了兩次,但是只存了乙份,在同乙個位址,這用到了jvm常量池的知識.於是乎,如果rdd中有大量的重複資料,或者array中需要存大量重複資料的時候我們都可以將重複資料轉化為string,能夠有效的減少記憶體使用.

2.broadcast join和普通join:

在大資料分布式系統中,大量資料的移動對效能的影響也是巨大的。基於這個思想,在兩個rdd進行join操作的時候,如果其中乙個rdd相對小很多,可以將小的rdd進行collect操作然後設定為broadcast變數,這樣做之後,另乙個rdd就可以使用map操作進行join,這樣能夠有效的減少相對大很多的那個rdd的資料移動。

3.先filter在join:

這個就是謂詞下推,這個很顯然,filter之後再join,shuffle的資料量會減少,這裡提一點是spark-sql的優化器已經對這部分有優化了,不需要使用者顯示的操作,個人實現rdd的計算的時候需要注意這個。

5. combinebykey的使用:

這個操作在map-reduce中也有,這裡舉個例子:rdd.groupbykey().mapvalue(_.sum)比rdd.reducebykey的效率低,原因如下兩幅圖所示(網上盜來的,侵刪)

上下兩幅圖的區別就是上面那幅有combinebykey的過程減少了shuffle的資料量,下面的沒有。combinebykey是key-value型rdd自帶的api,可以直接使用。

6. 在記憶體不足的使用,使用rdd.persist(storagelevel.memory_and_disk_ser)代替rdd.cache():

rdd.cache()和rdd.persist(storage.memory_only)是等價的,在記憶體不足的時候rdd.cache()的資料會丟失,再次使用的時候會重算,而rdd.persist(storagelevel.memory_and_disk_ser)在記憶體不足的時候會儲存在磁碟,避免重算,只是消耗點io時間。

7.在spark使用hbase的時候,spark和hbase搭建在同乙個集群:

在spark結合hbase的使用中,spark和hbase最好搭建在同乙個集群上上,或者spark的集群節點能夠覆蓋hbase的所有節點。hbase中的資料儲存在hfile中,通常單個hfile都會比較大,另外spark在讀取hbase的資料的時候,不是按照乙個hfile對應乙個rdd的分割槽,而是乙個region對應乙個rdd分割槽。所以在spark讀取hbase的資料時,通常單個rdd都會比較大,如果不是搭建在同乙個集群,資料移動會耗費很多的時間。

引數優化部分:

8. spark.driver.memory (default 1g):

這個引數用來設定driver的記憶體。在spark程式中,sparkcontext,dagscheduler都是執行在driver端的。對應rdd的stage切分也是在driver端執行,如果使用者自己寫的程式有過多的步驟,切分出過多的stage,這部分資訊消耗的是driver的記憶體,這個時候就需要調大driver的記憶體。

9. spark.rdd.compress (default false) :

這個引數在記憶體吃緊的時候,又需要persist資料有良好的效能,就可以設定這個引數為true,這樣在使用persist(storagelevel.memory_only_ser)的時候,就能夠壓縮記憶體中的rdd資料。減少記憶體消耗,就是在使用的時候會占用cpu的解壓時間。

11. spark.memory.storagefraction (default 0.5)

這個引數設定記憶體表示 executor記憶體中 storage/(storage+execution),雖然spark-1.6.0+的版本記憶體storage和execution的記憶體已經是可以互相借用的了,但是借用和贖回也是需要消耗效能的,所以如果明知道程式中storage是多是少就可以調節一下這個引數。

12.spark.locality.wait (default 3s):

spark中有4中本地化執行level,process_local->node_local->rack_local->any,乙個task執行完,等待spark.locality.wait時間如果,第一次等待process的task到達,如果沒有,等待任務的等級下調到node再等待spark.locality.wait時間,依次類推,直到any。分布式系統是否能夠很好的執行本地檔案對效能的影響也是很大的。如果rdd的每個分割槽資料比較多,每個分割槽處理時間過長,就應該把 spark.locality.wait 適當調大一點,讓task能夠有更多的時間等待本地資料。特別是在使用persist或者cache後,這兩個操作過後,在本地機器呼叫記憶體中儲存的資料效率會很高,但是如果需要跨機器傳輸記憶體中的資料,效率就會很低。

13. spark.speculation (default false):

乙個大的集群中,每個節點的效能會有差異,spark.speculation這個引數表示空閒的資源節點會不會嘗試執行還在執行,並且執行時間過長的task,避免單個節點執行速度過慢導致整個任務卡在乙個節點上。這個引數最好設定為true。與之相配合可以一起設定的引數有spark.speculation.×開頭的引數。參考中有文章詳細說明這個引數。

以後有遇到新的內容再補充。

參考:1.

2. 3.

4.

Spark 遇到OOM怎麼解決

map執行中記憶體溢位 shuffle後記憶體溢位 spark 記憶體模型 spark在乙個executor中的記憶體分為三塊 一塊是execution記憶體,一塊是storage記憶體,一塊是other記憶體。當spark程式中,存在過多的小任務的時候,可以通過 rdd.coalesce方法,收縮...

oom問題解決

dalvik虛擬機會為應用程式分配固定大小的heap 如果使用超過了這個heap的大小,且沒有可被 物件,就會報oom。多張較大會迅速占用空間造成oom。我們可以使用一下的方法來減少這種情況的產生 1.減少單張的大小,根據螢幕大小來對bitmap做resize。private void setima...

Spark 中OOM的現象 原因 解決方案和總結

參考 spark的executor的container記憶體有兩大部分組成 堆外記憶體和executor記憶體。堆外記憶體 有spark.yarn.executor.memeoryoverhead引數設定。如果沒有設定,則使用 val executormemoryoverhead sparkconf...