spark利用cache優化shuffle

2021-10-13 17:58:44 字數 2542 閱讀 9410

cache表,資料放記憶體,資料被廣播到executor,

將多份資料進行關聯是資料處理過程中非常普遍的用法,不過在分布式計算系統中,這個問題往往會變的非常麻煩,因為框架提供的 join 操作一般會將所有資料根據 key 傳送到所有的 reduce 分割槽中去,也就是 shuffle 的過程。造成大量的網路以及磁碟io消耗,執行效率極其低下,這個過程一般被稱為 reduce-side-join。

如果其中有張表較小的話,我們則可以自己實現在 map 端實現資料關聯,跳過大量資料進行 shuffle 的過程,執行時間得到大量縮短,根據不同資料可能會有幾倍到數十倍的效能提公升,這個過程是map-side-join

reduce-side-join 的缺陷在於會將key相同的資料傳送到同乙個partition中進行運算,大資料集的傳輸需要長時間的io,同時任務併發度收到限制,還可能造成資料傾斜。

reduce-side-join 執行圖如下

map-side-join 執行圖如下

//快取全表

sqlcontext.sql("cache table activity")

//快取過濾結果

sqlcontext.sql("cache table activity_cached as select * from activity where ...")

cache table 是即時生效的,如果你想等到乙個action操作再快取資料可以使用 cache lazy table,這樣操作會直到乙個 action 操作才被觸發,例如 count(*)

sqlcontext.sql("cache lazy table ...")
取消hive表快取資料

sqlcontext.sql("uncache table activity")
示例:

我們也需要注意cachetable與uncachetable的使用時機,cachetable主要用於快取中間表結果,它的特點是少量資料且被後續計算(sql)頻繁使用;如果中間表結果使用完畢,我們應該立即使用uncachetable釋放快取空間,用於快取其它資料

val df = sqlcontext.sql("select * from activity")

df.registertemptable("activity_cached")

sqlcontext.cachetable("activity_cached")

tip:cachetable操作是lazy的,需要乙個action操作來觸發快取操作。

對應的uncachetable可以取消快取

sqlcontext.uncachetable("activity_cached")
val df = sqlcontext.sql("select * from tablename")

df.cache()

added rdd_xx_x in memory on ...
如果記憶體不足,則會存入磁碟中,提示如下:

added rdd_xx_x on disk on ...
快取資料後可以在storage上看到快取的資料

spark.sql.autobroadcastjointhreshold
該引數預設為10m,在進行join等聚合操作時,將小於該值的表broadcast到每台worker,消除了大量的shuffle操作。

spark.rdd.compress true
將rdd存入mem或disk前再進行一次壓縮,效果顯著,我使用cachetable了一張表,沒有開啟該引數前總共cache了54g資料,開啟這個引數後只34g,可是執行速度並沒有收到太大的影響。

spark.sql.shuffle.partitions
這個引數預設為200,是join等聚合操作的並行度,如果有大量的資料進行操作,造成單個任務比較重,執行時間過長的時候,會報如下的錯誤:

org.apache.spark.shuffle.fetchfailedexception: connection from /192.168.xx.***:53450 closed
這個時候需要提高該值。

spark的cache和checkpoint的區別

要知道區別,就要首先知道實現的原理和使用的場景 cache就是講共用的或者重複使用的rdd按照持久化的級別進行快取。checkpoint 就是將業務非常長的邏輯計算的中間結果快取到hdfs上,他的實現原理是 首先找打stage最後的finalrdd,然後按照rdd的依賴關係回溯,找到使用checkp...

spark中cache和checkpoint使用

1 cache cache是為了追求計算的速度 spark中計算任務在記憶體中,但是結果是儲存在磁碟中的,所以首次執行會慢,之後會拿磁碟中的計算結果,所以後面會快很多 通過對結果的rdd分布式資料集進行cache,將計算結果快取在記憶體中,這樣會比快取在磁碟中更快的讀取。比如計算log檔案的行數 s...

效能優化之cache

隨著cpu的頻率不斷提公升,而記憶體的訪問速度卻沒有質的突破,為了彌補訪問記憶體的速度慢,充分發揮cpu的計算資源,提高cpu整體吞吐量,在cpu與記憶體之間引入了一級cache。隨著熱點資料體積越來越大,一級cache l1已經不滿足發展的要求,引入了二級cache l2,cache l3cpu ...