Hive的連線 join 方案

2021-08-11 02:37:20 字數 3711 閱讀 2764

一 common join/reduce side join/shuffle join

這三種其實都是一種連線方案:即在reduce端做join操作。一般情況下,如果不手動指定mapjoin或者不滿足mapjoin的條件,一般hive解析器會將join操作轉換成reducejoin. 他會經歷完整的map

->shuffle->reduce三個階段

map階段: 讀取表中資料,輸出的時候以joinkey作為outputkey,如果有多個關聯條件,然後這些會組合成乙個key, 輸出的時候以select欄位或者where條件需要的列作為outputvalue,同時還會包含表的tag資訊,以此標誌這個value屬於哪乙個表

shuffle階段:根據key進行partition操作並且根據key進行排序,可以確保相同的key在同乙個reduce之中

reduce階段:根據key值完成join操作,通過不同的tag來識別不同表中的資料

我們考慮個問題:在reduce端做join,肯定各個reduce端要去對應map端取資料,其中的shuffle過程想必都是知道的,所以如果表態大,會有大量的磁碟i/o和網路i/o操作,所以這種情況下,如果是大表之間join是不太適合這種方案的。

適合場景:

兩張表大小相當,而且表不是很大的適合可以使用這種方式,順便集合一下hive中reduce的調優是沒有問題的:

#比如開啟中間壓縮和最終輸出壓縮:但是cpu會增加,但是hdfs 寫的時間會減少

sethive.exec.compress.intermediate=true;

setmapred.map.output.compression.codec=org.apache.hadoop.io.

sethive.exec.compress.output=true;

setmapred.output.compression.codec=org.apache.hadoop.io.compress.

gzipcodec   

#設定set mapred.max.split.size調大一點:cpu可以適當的減少

二 mapside join/broadcast join

如果有一張小表和大表做join,那麼這個時候,我們使用reduce 端做join是不太划算的,因為我們完全可以避免去跑reduce。

在map端直接讀入小表進記憶體,然後在和大表join,join完畢,資料直接寫入hdfs,完全沒有必要去走reduce

首先我們需要確定什麼小表,它是由什麼決定的呢?它是由hive.mapjoin.smalltable.filesize來決定,預設是hive.mapjoin.smalltable.

filesize=25000000,大約23m左右。

比如這種:

select/*+mapjoin(time_dim)*/ count(*) from store_sales

sjoin time_dim t on s.ss_old_time_sk = t.t_tim_sk;

才能使用mapjoin,現在已經有引數可以控制是否自動轉換成mapjoin,這個轉換有引數:hive.auto.convert.join=true來控制,預設就是true.

原理:

1客戶端本地執行的task1,它會掃瞄小表的的資料,然後將其轉換成hashtable的資料結構,並寫入本地的檔案中,

/tmp/hadoop/479fb43b-6fc4-4aa8-846c-a052944ad02d/hive_2015-03-18_15-18-13_374_8641342742824793758-1/-local-10003/hashtable-stage-3/mapjoin-mapfile01--.hashtable

2之後將該檔案載入到distributedcache中,

uploaded1 file to:file:/tmp/hadoop/479fb43b-6fc4-4aa8-846c-a052944ad02d/hive_2017-03-18_15-18-13_374_8641342742824793758-1/-local-10003/hashtable-stage-3/mapjoin-mapfile01--.hashtable

3task 2 是乙個沒有reduce的task,啟動之後掃瞄大表,在map階段,根據大表每一條記錄然後去和distributedcache中對應hashtable關聯,然後輸出結果,所以有多少個maptask就有多少個檔案

三 smbjoin (sort-merge-bucket)

我們試想乙個場景:2張大表參與join, 都是上千萬或者上億條記錄,

假設這時候,這兩張表中小表可能資料太多,不太適合載入進記憶體,那麼mapjoin肯定是不合適的,那麼reducejoin呢?假設a表有資料4000萬條,b表有資料6000萬條,這時候reducejoin首先肯定大量網路i/o和磁碟i/o,另外a表資料需要逐條和b表資料進行匹配吧,這個效率肯定不行。所以smb join可以帶來效能很大的提公升

首先: 兩張表都是可以分桶的,在建立表的時候需要指定:

createtable(……) clustered by (col_name) sorted by

(col_name)into bucketsnum buckets

其次:兩張表分桶的列必須是join key

在匯入資料值前,我們需要設定hive.enfoce.bucketing=true,如果我們沒有設定,我們就需要設定與桶數匹配的reducer數目,並在查詢的時候需要新增cluster by子句

然後,我們希望smb join轉換成smb map join需要設定一下引數:

sethive.auto.convert.sortmerge.join=true; 

set hive.optimize.bucketmapjoin = true; 

set hive.optimize.bucketmapjoin.sortedmerge =true;    

原理:

桶中的資料可以根據乙個列或者多個列排序,這樣每個桶的join就變成了merge sort,可以進一步提公升map join的效率

真實資料中資料傾斜是一定的, hadoop 中預設是使用

hive.exec.reducers.bytes.per.reducer =1000000000

也就是每個節點的reduce 預設是處理1g大小的資料,如果你的join 操作也產生了資料傾斜,那麼你可以在hive 中設定

set hive.optimize.skewjoin = true; 

set hive.skewjoin.key = skew_key_threshold (default = 100000)

hive 在執行的時候沒有辦法判斷哪個key 會產生多大的傾斜,所以使用這個引數控制傾斜的閾值,如果超過這個值,新的值會傳送給那些還沒有達到的reduce, 一般可以設定成你

(處理的總記錄數/reduce個數)的2-4倍都可以接受.

傾斜是經常會存在的,一般select 的層數超過2層,翻譯成執行計畫多於3個以上的mapreduce job 都很容易產生傾斜,建議每次執行比較複雜的sql 之前都可以設一下這個引數. 如果你不知道設定多少,可以就按官方預設的1個reduce 只處理1g 的演算法,那麼  skew_key_threshold  =1g/平均行長. 或者預設直接設成250000000(差不多算平均行長4個位元組)

hive中所有的join連線

內連線 inner join join優化 在進行join的時候,大表放在最後面 但是使用 streamtable 大表名稱 來標記大表,那麼大表放在什麼位置都行了 select streamtable s s.ymd,d.dividend from stocks s inner join divi...

Hive中的Join操作

hive中有許多的join操作,如果left,right和full outer join,inner join,left semi join等。那麼它們都各自有什麼特點呢?感覺很難說明這些區別,還是通過例子來看。如果我們有乙個表,資料如下 a.txt 1,a 2,b3,c 4,d7,y 8,u另乙個...

HIVE中的join語句

hive支援通常的sql join語句,但是只支援等值連線。1.1 inner join 只有進行連線的兩個表都存在與連線標準相匹配的資料才會儲存下來 select a.ymd a.price b.price from stocks a join stocks b on a.ymd b.ymd wh...