Spark的五種JOIN策略解析

2021-10-10 05:40:32 字數 3526 閱讀 9211

join操作是非常常見的資料處理操作,spark作為乙個統一的大資料處理引擎,提供了非常豐富的join場景。本文分享將介紹spark所提供的5種join策略,希望對你有所幫助。本文主要包括以下內容:

參與join的資料集的大小會直接影響join操作的執行效率。同樣,也會影響join機制的選擇和join的執行效率。

join的條件會涉及字段之間的邏輯比較。根據join的條件,join可分為兩大類:等值連線非等值連線。等值連線會涉及乙個或多個需要同時滿足的相等條件。在兩個輸入資料集的屬性之間應用每個等值條件。當使用其他運算子(運算連線符不為**=**)時,稱之為非等值連線。

在輸入資料集的記錄之間應用連線條件之後,join型別會影響join操作的結果。主要有以下幾種join型別:

spark提供了5種join機制來執行具體的join操作。該5種join機制如下所示:

簡介當要join的表資料量比較大時,可以選擇shuffle hash join。這樣可以將大表進行按照join的key進行重分割槽,保證每個相同的join key都傳送到同乙個分割槽中。如下圖示:

如上圖所示:shuffle hash join的基本步驟主要有以下兩點:

條件與特點簡介

也稱之為map端join。當有一張表較小時,我們通常選擇broadcast hash join,這樣可以避免shuffle帶來的開銷,從而提高效能。比如事實表與維表進行join時,由於維表的資料通常會很小,所以可以使用broadcast hash join將維表進行broadcast。這樣可以避免資料的shuffle(在spark中shuffle操作是很耗時的),從而提高join的效率。在進行 broadcast join 之前,spark 需要把處於 executor 端的資料先傳送到 driver 端,然後 driver 端再把資料廣播到 executor 端。如果我們需要廣播的資料比較多,會造成 driver 端出現 oom。具體如下圖示:

broadcast hash join主要包括兩個階段:

條件與特點

longmetric(

"datasize"

)+= datasize

if(datasize >=(8l

<<30)

) gb"

)}

簡介

該join機制是spark預設的,可以通過引數spark.sql.join.prefersortmergejoin進行配置,預設是true,即優先使用sort merge join。一般在兩張大表進行join時,使用該方式。sort merge join可以減少集群中的資料傳輸,該方式不會先載入所有資料的到記憶體,然後進行hashjoin,但是在join之前需要對join key進行排序。具體圖示:

sort merge join主要包括三個階段:

條件與特點簡介

如果 spark 中兩張參與 join 的表沒指定join key(on 條件)那麼會產生 cartesian product join,這個 join 得到的結果其實就是兩張行數的乘積。

條件簡介

該方式是在沒有合適的join機制可供選擇時,最終會選擇該種join策略。優先順序為:broadcast hash join > sort merge join > shuffle hash join > cartesian join > broadcast nested loop join.

在cartesian 與broadcast nested loop join之間,如果是內連線,或者非等值連線,則優先選擇broadcast nested loop策略,當時非等值連線並且一張表可以被廣播時,會選擇cartesian join。

條件與特點

有join提示(hints)的情況,按照下面的順序

沒有join提示(hints)的情況,則逐個對照下面的規則有join提示(hints),按照下面的順序

沒有join提示(hints),則逐個對照下面的規則

object joinselection extends strategy

with predicatehelper

with joinselectionhelper

}def createshufflehashjoin(onlylookingathint:

boolean)=

}def createsortmergejoin()=

else

}def createcartesianproduct()=

else

}def createjoinwithouthint()=

else

}.orelse(createsortmergejoin())

.orelse(createcartesianproduct())

.getorelse

} createbroadcasthashjoin(

true

).orelse

.orelse(createshufflehashjoin(

true))

.orelse

.getorelse(createjoinwithouthint())

if(canbuildleft(jointype)

) buildleft else buildright

}def createbroadcastnljoin(buildleft:

boolean

, buildright:

boolean)=

else

if(buildleft)

else

if(buildright)

else

maybebuildside.map

}def createcartesianproduct()=

else

}def createjoinwithouthint()=

} createbroadcastnljoin(hinttobroadcastleft(hint)

, hinttobroadcastright(hint)

).orelse

.getorelse(createjoinwithouthint())

case _ => nil}}

本文主要介紹了spark提供的5種join策略,並對三種比較重要的join策略進行了圖示解析。首先對影響join的因素進行了梳理,然後介紹了5種spark的join策略,並對每種join策略的具體含義和觸發條件進行了闡述,最後給出了join策略選擇對應的原始碼片段。希望本文能夠對你有所幫助。

Spark的五種Join策略

join操作是非常常見的資料處理操作,spark作為乙個統一的大資料處理引擎,提供了非常豐富的join場景。本文分享將介紹spark所提供的5種join策略,希望對你有所幫助。本文主要包括以下內容 參與join的資料集的大小會直接影響join操作的執行效率。同樣,也會影響join機制的選擇和join...

Spark (十) Spark 的種型別Join

join是sql語句中的常用操作,良好的表結構能夠將資料分散在不同的表中,使其符合某種正規化,減少表冗餘 更新容錯等。而建立表和表之間關係的最佳方式就是join操作。sparksql作為大資料領域的sql實現,自然也對join操作做了不少優化,今天主要看一下在sparksql中對於join,常見的3...

Spark和Hive處理資料傾斜的兩種解決方案

比如處理80tb的資料,partition數量為15000,理論上平均每個節點是5 6g的資料,但是實際上根據key value在儲存時,很有可能因為某個key的數量特別多,導致資料傾斜。這樣就會出現超過物理記憶體限制的報錯。偶爾重試可能會通過,但是會比較不穩定 目前我們這邊的兩種解決辦法是 1.如...