MR 二次排序

2021-09-24 18:30:16 字數 3092 閱讀 7927

二次排序

eg: 對左側序列進行排序, 需要先對字母排序, 然後在對數字進行排序. 得到右側的序列.

a 2 a 2

c 4 a 4

b 3 b 1

c 1 => b 3

a 4 c 1

b 1 c 4

public class sortmapreduce extends configured implements tool

}// reducer,map的輸出就是我們reduce的輸入

public static class sortreducer extends reducer

collections.sort(valuelist);//資源開銷過大 // 排序

for(integer value : valueslist) }}

//driver:任務相關設定

public int run(string args) throws exception

public static void main(string args) throws exception ;

//將任務跑起來

//int statas = new wordcountmapreduce().run(args);

int statas = toolrunner.run(conf, new sortmapreduce(), args);

//關閉我們的job

system.exit(statas);

}}

public class pairwritable implements writablecomparable

public pairwritable(string first, int second)

public void set(string first, int second)

public string getfirst()

public void setfirst(string first)

public int getsecond()

public void setsecond(int second)

public void write(dataoutput out) throws ioexception

public void readfields(datainput in) throws ioexception

public int compareto(pairwritable o)

return integer.valueof(getsecond()).compareto(

integer.valueof(o.getsecond()));

}@override

public int hashcode()

@override

public boolean equals(object obj) else if (!first.equals(other.first))

return false;

if (second != other.second)

return false;

return true;

}@override

public string tostring()

}

map reduce實現. sortmapreduce

public class sortmapreduce extends configured implements tool

mapoutputkey.set(strs[0], integer.valueof(strs[1]));

mapoutputvalue.set(integer.valueof(strs[1]));

context.write(mapoutputkey, mapoutputvalue);}}

// reducer,map的輸出就是我們reduce的輸入

public static class sortreducer extends reducer}}

// driver:任務相關設定

public int run(string args) throws exception

public static void main(string args) throws exception ;

//將任務跑起來

//int statas = new wordcountmapreduce().run(args);

int statas = toolrunner.run(conf, new sortmapreduce(), args);

//關閉我們的job

system.exit(statas);

}}

關於shuffle優化. firstpartitioner, 以原始的 a,b,c的key作為排序依據. 不是以pairwritable進行排序切分的依據.

public class firstpartitioner extends partitioner

}

構建分組的方式 firstgroupingcomparator. 這也是發生在 shuffle階段.

public class firstgroupingcomparator implements rawcomparator

// b 表示要比較的兩個位元組陣列

// s 第乙個位元組陣列的進行比較的尾部位置, 位元組陣列中位元組的偏移量.

// l 第乙個位元組比到組合pairwritable的前乙個位元組. l‐4 是表示去除最後乙個位元組所佔記憶體的大小, 因為int

型別占用 4 個位元組的大小.

public int compare(byte b1, int s1, int l1, byte b2, int s2, int l2)

}

Hadoop Streaming二次排序

由於hadoop機器記憶體不足,所以需要把資料mapred進來跑。這樣,就需要,同乙個key下的輸入資料是有序的,即 對於keya的資料,要求data1先來,之後data2再來 所以需要對data進行二次排序。d stream.num.map.output.key.fields 2 這個,可以設定在...

MapReduce二次排序

預設情況下,map輸出的結果會對key進行預設的排序,但個別需求要求對key排序的同時還需要對value進行排序 這時候就要用到二次排序了。本章以hadoop權威指南中計算每年最大氣溫值為例,原始資料雜亂無章 2008 33 2008 23 2008 43 2008 24 2008 25 2008 ...

Map reduce二次排序

map reduce的流程切面 splitmapperpartitioncombinergroupreducer 這裡要解釋下 partition 和 group 它們都是shuffle的重要步驟 的區別.他們的作用都是為了reducer分配記錄去處理.但區別是partition是把記錄分給不同的r...