Spark (十) Spark 的種型別Join

2022-07-19 00:00:24 字數 4306 閱讀 6979

join是sql語句中的常用操作,良好的表結構能夠將資料分散在不同的表中,使其符合某種正規化,減少表冗餘、更新容錯等。而建立表和表之間關係的最佳方式就是join操作。

sparksql作為大資料領域的sql實現,自然也對join操作做了不少優化,今天主要看一下在sparksql中對於join,常見的3種實現。

大家知道,在資料庫的常見模型中(比如星型模型或者雪花模型),表一般分為兩種:事實表和維度表。維度表一般指固定的、變動較少的表,例如聯絡人、物品種類等,一般資料有限。而事實表一般記錄流水,比如銷售清單等,通常隨著時間的增長不斷膨脹。

因為join操作是對兩個表中key值相同的記錄進行連線,在sparksql中,對兩個表做join最直接的方式是先根據key分割槽,再在每個分割槽中把key值相同的記錄拿出來做連線操作。但這樣就不可避免地涉及到shuffle,而shuffle在spark中是比較耗時的操作,我們應該盡可能的設計spark應用使其避免大量的shuffle。

當維度表和事實表進行join操作時,為了避免shuffle,我們可以將大小有限的維度表的全部資料分發到每個節點上,供事實表使用。executor儲存維度表的全部資料,一定程度上犧牲了空間,換取shuffle操作大量的耗時,這在sparksql中稱作broadcast join,如下圖所示:

table b是較小的表,將其廣播到每個executor節點上,table a的每個partition會通過block manager取到table a的資料。根據每條記錄的join key取到table b中相對應的記錄,根據join type進行操作。這個過程比較簡單,不做贅述。

broadcast join的條件有以下幾個:

1. 被廣播的表需要小於spark.sql.autobroadcastjointhreshold所配置的值,預設是10m (或者加了broadcast join的hint)

2. 基表不能被廣播,比如left outer join時,只能廣播右表

看起來廣播是乙個比較理想的方案,但它有沒有缺點呢?也很明顯。這個方案只能用於廣播較小的表,否則資料的冗餘傳輸就遠大於shuffle的開銷;另外,廣播時需要將被廣播的表現collect到driver端,當頻繁有廣播出現時,對driver的記憶體也是乙個考驗。

當一側的表比較小時,我們選擇將其廣播出去以避免shuffle,提高效能。但因為被廣播的表首先被collect到driver段,然後被冗餘分發到每個executor上,所以當表比較大時,採用broadcast join會對driver端和executor端造成較大的壓力。

但由於spark是乙個分布式的計算引擎,可以通過分割槽的形式將大批量的資料劃分成n份較小的資料集進行平行計算。這種思想應用到join上便是shuffle hash join了。利用key相同必然分割槽相同的這個原理,sparksql將較大表的join分而治之,先將表劃分成n個分割槽,再對兩個表中相對應分割槽的資料分別進行hash join,這樣即在一定程度上減少了driver廣播一側表的壓力,也減少了executor端取整張被廣播表的記憶體消耗。其原理如下圖:

shuffle hash join分為兩步:

1. 對兩張表分別按照join keys進行重分割槽,即shuffle,目的是為了讓有相同join keys值的記錄分到對應的分割槽中

2. 對對應分割槽中的資料進行join,此處先將小表分割槽構造為一張hash表,然後根據大表分割槽中記錄的join keys值拿出來進行匹配

shuffle hash join的條件有以下幾個:

1. 分割槽的平均大小不超過spark.sql.autobroadcastjointhreshold所配置的值,預設是10m 

2. 基表不能被廣播,比如left outer join時,只能廣播右表

3. 一側的表要明顯小於另外一側,小的一側將被廣播(明顯小於的定義為3倍小,此處為經驗值)

我們可以看到,在一定大小的表中,sparksql從時空結合的角度來看,將兩個表進行重新分割槽,並且對小表中的分割槽進行hash化,從而完成join。在保持一定複雜度的基礎上,儘量減少driver和executor的記憶體壓力,提公升了計算時的穩定性。

上面介紹的兩種實現對於一定大小的表比較適用,但當兩個表都非常大時,顯然無論適用哪種都會對計算記憶體造成很大壓力。這是因為join時兩者採取的都是hash join,是將一側的資料完全載入到記憶體中,使用hash code取join keys值相等的記錄進行連線。

當兩個表都非常大時,sparksql採用了一種全新的方案來對錶進行join,即sort merge join。這種實現方式不用將一側資料全部載入後再進星hash join,但需要在join前將資料排序,如下圖所示:

可以看到,首先將兩張表按照join keys進行了重新shuffle,保證join keys值相同的記錄會被分在相應的分割槽。分割槽後對每個分區內的資料進行排序,排序後再對相應的分區內的記錄進行連線,如下圖示:

看著很眼熟吧?也很簡單,因為兩個序列都是有序的,從頭遍歷,碰到key相同的就輸出;如果不同,左邊小就繼續取左邊,反之取右邊。

可以看出,無論分割槽有多大,sort merge join都不用把某一側的資料全部載入到記憶體中,而是即用即取即丟,從而大大提公升了大資料量下sql join的穩定性。

package

cn.edu360.spark08

import

org.apache.spark.sql.

object jointest

}

執行結果:

== physical plan ==

*(1) broadcasthashjoin [id#5], [aid#14], inner, buildright

:- localtablescan [id#5, token#6]

+- broadcastexchange hashedrelationbroadcastmode(list(cast(input[0, int, false

] as bigint)))

+- localtablescan [aid#14, atoken#15]

+---+-------+---+------+

| id| token|aid|atoken|

+---+-------+---+------+

| 0|playing| 0| p|

| 1| with| 1| w|

| 2| join| 2| s|

+---+-------+---+------+

從上面的資料可以看出,預設執行的是broadcasthashjoin。

5.2 sortmergejoin實現

package

cn.edu360.spark08

import

org.apache.spark.sql.

object jointest

}

輸出結果:

== physical plan ==

*(3) sortmergejoin [id#5], [aid#14], inner

:- *(1) sort [id#5 asc nulls first], false, 0: +- exchange hashpartitioning(id#5, 200)

: +- localtablescan [id#5, token#6]

+- *(2) sort [aid#14 asc nulls first], false, 0

+- exchange hashpartitioning(aid#14, 200)

+- localtablescan [aid#14, atoken#15]

+---+-------+---+------+

| id| token|aid|atoken|

+---+-------+---+------+

| 1| with| 1| w|

| 2| join| 2| s|

| 0|playing| 0| p|

+---+-------+---+------+

shuffle在可以自己定義好分割槽,然後進行join操作。

Spark效能調優(十)之Spark統一記憶體管理

一 memory manager 在spark 1.6 版本中,memorymanager 的選擇是由spark.memory.uselegacymode false決定的。如果採用1.6之前的模型,這會使用staticmemorymanager來管理,否則使用新的unifiedmemorymana...

關於Spark和Spark的學習資料

hadoop社群依然發展迅速,2014年推出了2.3,2.4,2.5 的社群版本,比如增強 resource manager ha,yarn rest api,acl on hdfs,改進 hdfs 的 web ui hadoop roadmap 根據我的觀察,主要更新在yarn,hdfs,而map...

Spark系列 二 Spark的資料讀入

真的是超級忙碌的一周,所幸的是我們迎來了新的家庭成員乙隻小貓咪 大王。取名為大王的原因竟然是因為之前作為流浪貓的日子總是被其他貓所欺負,所以希望他能做乙隻霸氣的霸王貓啦。言歸正傳,在周一見的悲傷中唯有寫一篇部落格才能緩解我的憂傷吧。spark讀取文字檔案 textfile def textfile ...