解決spark中遇到的資料傾斜問題

2021-08-13 18:19:43 字數 2106 閱讀 8284

多數task執行速度較快,少數task執行時間非常長,或者等待很長時間後提示你記憶體不足,執行失敗。

常見於各種shuffle操作,例如reducebykey,groupbykey,join等操作。

key本身分布不均勻(包括大量的key為空)

key的設定不合理

shuffle時的併發度不夠

計算方式有誤

spark中乙個stage的執行時間受限於最後那個執行完的task,因此執行緩慢的任務會拖累整個程式的執行速度(分布式程式執行的速度是由最慢的那個task決定的)。

過多的資料在同乙個task中執行,將會把executor撐爆,造成oom,程式終止執行。

乙個理想的分布式程式: 

發生資料傾斜時,任務的執行速度由最大的那個任務決定: 

發現資料傾斜的時候,不要急於提高executor的資源,修改引數或是修改程式,首先要檢查資料本身,是否存在異常資料。

如果任務長時間卡在最後最後1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些key造成的。

選取key,對資料進行抽樣,統計出現的次數,根據出現次數大小排序取出前幾個

df.select("key").sample(false,0.1).(k=>(k,1)).reducebykey(_+_).map(k=>(k._2,k._1)).sortbykey(false).take(10)
如果發現多數資料分布都較為平均,而個別資料比其他資料大上若干個數量級,則說明發生了資料傾斜。

經過分析,傾斜的資料主要有以下三種情況:

null(空值)或是一些無意義的資訊()之類的,大多是這個原因引起。

無效資料,大量重複的測試資料或是對結果影響不大的有效資料。

有效資料,業務導致的正常資料分布。

第1,2種情況,直接對資料進行過濾即可。

第3種情況則需要進行一些特殊操作,常見的有以下幾種做法。

隔離執行,將異常的key過濾出來單獨處理,最後與正常資料的處理結果進行union操作。

對key先新增隨機值,進行操作後,去掉隨機值,再進行一次操作。

使用reducebykey代替groupbykey使用map join。

如果使用reducebykey因為資料傾斜造成執行失敗的問題。具體操作如下:

將原始的key轉化為key + 隨機值(例如random.nextint)

對資料進行reducebykey(func)key + 隨機值轉成key再對資料進行reducebykey(func)

tip1: 如果此時依舊存在問題,建議篩選出傾斜的資料單獨處理。最後將這份資料與正常的資料進行union即可。

tips2: 單獨處理異常資料時,可以配合使用map join解決。

dataframesparksql可以設定spark.sql.shuffle.partitions引數控制shuffle的併發度,預設為200。 

rdd操作可以設定spark.default.parallelism控制併發度,預設引數由不同的cluster manager控制。

侷限性: 只是讓每個task執行更少的不同的key。無法解決個別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使乙個task單獨執行它,也會受到資料傾斜的困擾。

在小表不是特別大(取決於你的executor大小)的情況下使用,可以使程式避免shuffle的過程,自然也就沒有資料傾斜的困擾了。

侷限性: 因為是先將小資料傳送到每個executor上,所以資料量不能太大。

具體使用方法和處理流程參照:

spark map-side-join 關聯優化

spark join broadcast優化

解決 spark 中的資料傾斜問題

發現資料傾斜的時候,不要急於提高 executor 的資源,修改引數 或是修改程式,首先要檢查資料本身,是否存在異常資料。1 資料問題造成的資料傾斜 找出異常的 key 如果任務長時間卡在最後最後 1 個 幾個 任務,首先要對 key 進行 抽樣分析,判斷是哪些 key 造成的。選取 key,對資料...

spark解決資料傾斜問題

參考 資料傾斜發生的原理 資料傾斜的原理很簡單 在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的乙個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的資料量特別大的話,就會發生資料傾斜。比如大部分key對應10條資料,但是個別key卻對應了...

Spark 資料傾斜

計算資料時,資料分散度不夠,導致大量資料集中到一台或幾台機器上計算。區域性計算遠低於平均計算速度,整個過程過慢。部分任務處理資料量過大,可能oom,任務失敗,進而應用失敗。1 executor lost driver oom shuffle過程出錯 2 正常執行任務突然失敗 3 單個executor...