MapReduce Join聯結實現

2021-06-07 02:18:24 字數 4686 閱讀 2086

一、背景

早在8月份的時候,我就做了一些mr的join查詢,但是發現回北京之後,2個月不用,居然有點生疏,所以今天早上又花時間好好看了一下,順便寫下這個文件,以供以後查閱。

二、環境

jdk 1.6、linux作業系統、hadoop0.20.2

三、資料

資料在做這個join查詢的時候,必然涉及資料,我這裡設計了2張表,分別較

data

.txt和info.txt,字段之間以\t劃分。

data.txt內容如下:

201001 1003 abc

201002 1005 def

201003 1006 ghi

201004 1003 jkl

201005 1004 mno

201006 1005 pqr

info.txt內容如下:

1003 kaka

1004 da

1005 jue

1006 zhao

期望輸出結果:

1003 201001 abc kaka

1003 201004 jkl kaka

1004 201005 mno da

1005 201002 def jue

1005 201006 pqr jue

1006 201003 ghi zhao

四、map**

首先是map的**,我貼上,然後簡要說說

@override

protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception else

}if (pathname.contains("info.txt")) else }}

}這裡需要注意以下部分:

a、pathname是檔案在hdfs中的全路徑(例如:

hdfs

://m1:9000/dajuezhao/join/data/info.txt),可以以endswith()的方法來判斷。

b、資料表,也就是這裡的info.txt需要放在前面,也就是標識號是0.否則無法輸出理想結果。

c、map執行完成之後,輸出的中間結果如下:

1003,1 201001 abc

1003,1 201004 jkl

1004,1 201005 mon

1005,1 201002 def

1005,1 201006 pqr

1006,1 201003 ghi

1003,0 kaka

1004,0 da

1005,0 jue

1006,0 zhao

五、分割槽和分組

1、map之後的輸出會進行一些分割槽的操作,**貼出來: 

public static class example_join_01_partitioner extends partitioner

}分割槽我在以前的文件中寫過,這裡不做描述了,就說是按照map輸出的符合key的第乙個欄位做分割槽關鍵字。分割槽之後,相同key會劃分到乙個

reduce

中去處理(如果reduce設定是1,那麼就是分割槽有多個,但是還是在乙個reduce中處理。但是結果會按照分割槽的原則排序)。分割槽後結果大致如下:

同一區:

1003,1 201001 abc

1003,1 201004 jkl

1003,0 kaka

同一區:

1004,1 201005 mon

1004,0 da

同一區:

1005,1 201002 def

1005,1 201006 pqr

1005,0 jue

同一區:

1006,1 201003 ghi

1006,0 zhao

2、分組操作,**如下

public static class example_join_01_comparator extends writablecomparator

@suppresswarnings("unchecked")

public int compare(writablecomparable a, writablecomparable b)

}分組操作就是把在相同分割槽的資料按照指定的規則進行分組的操作,就以上來看,是按照復合key的第乙個欄位做分組原則。輸出後結果如下:

同一組:

1003,0 kaka

1003,0 201001 abc

1003,0 201004 jkl

同一組:

1004,0 da

1004,0 201005 mon

同一組:

1005,0 jue

1005,0 201002 def

1005,0 201006 pqr

同一組:

1006,0 zhao

1006,0 201003 ghi

六、reduce操作

貼上**如下:

public static class example_join_01_reduce extends reducer}}

1、**比較簡單,首先獲取關鍵的id值,就是key的第乙個字段。

2、獲取公用的字段,通過排組織後可以看到,一些共有欄位是在第一位,取出來即可。

3、遍歷餘下的結果,輸出。

七、其他的支撐**

1、首先是textpair**,沒有什麼可以細說的,貼出來:

public class textpair implements writablecomparable

public textpair(string first, string second)

public textpair(text first, text second)

public void set(text first, text second)

public text getfirst()

public text getsecond()

public void write(dataoutput out) throws ioexception

public void readfields(datainput in) throws ioexception

public int compareto(textpair tp)

return second.compareto(tp.second);}}

2、job的入口函式

public static void main(string agrs) throws ioexception, interruptedexception, classnotfoundexception

"hadoop.

job.ugi", "root,hadoop");

job job = new job(conf, "example_join_01");

// 設定

執行的job

job.setjarbyclass(example_join_01.class);

// 設定map的輸出

job.setmapoutputkeyclass(textpair.class);

job.setmapoutputvalueclass(text.class);

// 設定partition

job.setpartitionerclass(example_join_01_partitioner.class);

// 在分割槽之後按照指定的條件分組

job.setgroupingcomparatorclass(example_join_01_comparator.class);

// 設定reduce

job.setreducerclass(example_join_01_reduce.class);

// 設定reduce的輸出

job.setoutputkeyclass(text.class);

job.setoutputvalueclass(text.class);

// 設定輸入和輸出的目錄

fileinputformat.addinputpath(job, new path(otherargs[0]));

fileinputformat.addinputpath(job, new path(otherargs[1]));

fileoutputformat.setoutputpath(job, new path(otherargs[2]));

// 執行,直到結束就退出

system.exit(job.waitforcompletion(true) ? 0 : 1);

}八、總結

1、這是個簡單的join查詢,可以看到,我在處理輸入源的時候是在map端做**判斷。其實在0.19可以用multipleinputs.addinputpath()的方法,但是它用了jobconf做引數。這個方法原理是多個資料來源就採用多個map來處理。方法各有優劣。

2、對於資源表,如果我們採用0和1這樣的模式來區分,資源表是需要放在前的。例如本例中info.txt就是資源表,所以標識位就是0.如果寫為1的話,可以試下,在分組之後,資源表對應的值放在了迭代器最後一位,無法追加在最後所有的結果集合中。

3、關於分割槽,並不是所有的map都結束才開始的,一部分資料完成就會開始執行。同樣,分組操作在乙個分區內執行,如果分割槽完成,分組將會開始執行,也不是等所有分割槽完成才開始做分組的操作。

內聯結 外聯結 左聯結 右聯結

1 內聯結 將兩個表中存在聯結關係的字段符合聯結關係的那些記錄形成記錄集的聯結。2 外聯結 分為外左聯結和外右聯結。左聯結a b表的意思 就是將表a中的全部記錄和表b中聯結的字段與表a的聯結字段符合聯結條件的那些記錄形成的記錄集的聯結,這裡注意的是最後出來的記錄集會包括表a的全部記錄。右聯結a b表...

11 半聯結 反聯結

半聯結 和 反聯結是 oracle 優化器能夠選擇用來在獲取資訊時應用的兩個密切相關的聯結方法 實際上是聯結方法的選項 半聯結in 的半聯結 select using in department name from hr.departments dept where department id in...

MySQL的內部聯結,外部聯結和自然聯結

一 select語句 乙個典型的sql查詢語句具有如下形式 select a1,a2,an from r1,r2,rm where p select子句列出查詢結果中所需要的屬性。from子句是乙個查詢求值中需要訪問的關係列表。where子句是乙個作用在from子句關係屬性上的謂詞。其中from這個...