mapreduce 二次排序

2022-09-08 02:03:10 字數 3949 閱讀 2580

所謂二次排序,對第1個字段相同的資料,使用第2個字段進行排序。

舉個例子,電商平台記錄了每一使用者的每一筆訂單的訂單金額,現在要求屬於同乙個使用者的所有訂單金額作排序,並且輸出的使用者名稱也要排序。

賬戶訂單金額

hadoop@apache

200hive@apache

550yarn@apache

580hive@apache

159hadoop@apache

300hive@apache

258hadoop@apache

300yarn@apache

100hadoop@apache

150yarn@apache

560yarn@apache

260二次排序後的結果

賬戶訂單金額

hadoop@apache

150hadoop@apache

200hadoop@apache

300hadoop@apache

300hive@apache

159hive@apache

258hive@apache

550yarn@apache

100yarn@apache

260yarn@apache

560yarn@apache

580實現的思路是使用自定義key,key中實現按使用者名稱和訂單金額2個字段的排序,自定義分割槽和分組類,按使用者名稱進行分割槽和分組。自定義排序的比較器,分別用於在map端和reduce的合併排序。

因為hadoop預設使用的字串序列化j**a.io.dataoutputstream.writeutf(), 使用了"變種的utf編碼",序列化後的位元組流不能在rawcomparator使用。

在實現中,用一種變通的方法,直接使用「賬戶」欄位的位元組流,並且把位元組流長度也一併序列化。rawcomparator得到的位元組流就是我們寫進去的位元組流。當然,在進行反序列化時,需要根據這個長度來讀出「賬戶」字段。

程式**

package com.hadoop;

import j**a.io.datainput;

import j**a.io.dataoutput;

import j**a.io.ioexception;

import j**a.nio.charset.charset;

import org.apache.hadoop.conf.configuration;

import org.apache.hadoop.conf.configured;

import org.apache.hadoop.fs.path;

import org.apache.hadoop.io.doublewritable;

import org.apache.hadoop.io.longwritable;

import org.apache.hadoop.io.text;

import org.apache.hadoop.io.writablecomparable;

import org.apache.hadoop.io.writablecomparator;

import org.apache.hadoop.mapreduce.job;

import org.apache.hadoop.mapreduce.partitioner;

import org.apache.hadoop.mapreduce.reducer;

import org.apache.hadoop.mapreduce.lib.input.fileinputformat;

import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;

import org.apache.hadoop.security.usergroupinformation;

import org.apache.hadoop.util.tool;

import org.apache.hadoop.util.toolrunner;

public class secondarysortmapreduce extends configured implements tool

public string getaccount()

public double getcost()

@override

public void write(dataoutput out) throws ioexception

@override

public void readfields(datainput in) throws ioexception

@override

public int compareto(costbean o)

return account.compareto(o.account);

} @override

public string tostring() }

/*** 用於map端和reduce端排序的比較器:如果賬戶相同,則比較金額

* @author ivan

* */

public static class costbeancomparator extends writablecomparator else

} }/**

* 用於map端在寫磁碟使用的分割槽器

* @author ivan

* */

public static class costbeanpatitioner extends partitioner }

/*** 用於在reduce端分組的比較器根據account欄位分組,即相同account的作為一組

* @author ivan

* */

public static class groupcomparator extends writablecomparator }

/*** @author ivan

* */

private final costbean outputkey = new costbean();

private final doublewritable outputvalue = new doublewritable();

@override

protected void map(longwritable key, text value, context context)

throws ioexception, interruptedexception }

public static class secondarysortreducer extends reducer

} }public int run(string args) throws exception

/*** @param args

* @throws exception

*/public static void main(string args) throws exception

toolrunner.run(new configuration(), new secondarysortmapreduce(), args);

}}

執行環境

拿上面的例子作為測試資料

賬戶金額

hadoop@apache

200hive@apache

550yarn@apache

580hive@apache

159hadoop@apache

300hive@apache

258hadoop@apache

300yarn@apache

100hadoop@apache

150yarn@apache

560yarn@apache

260

MapReduce二次排序

預設情況下,map輸出的結果會對key進行預設的排序,但個別需求要求對key排序的同時還需要對value進行排序 這時候就要用到二次排序了。本章以hadoop權威指南中計算每年最大氣溫值為例,原始資料雜亂無章 2008 33 2008 23 2008 43 2008 24 2008 25 2008 ...

Map reduce二次排序

map reduce的流程切面 splitmapperpartitioncombinergroupreducer 這裡要解釋下 partition 和 group 它們都是shuffle的重要步驟 的區別.他們的作用都是為了reducer分配記錄去處理.但區別是partition是把記錄分給不同的r...

mapreduce二次排序案例

為什麼需要二次排序?在mapreduce操作時,我們知道傳遞的會按照key的大小進行排序,最後輸出的結果是按照key排過序的。有的時候我們在key排序的基礎上,對value也進行排序。這種需求就是二次排序 解決思路 我們可以把key和value聯合起來作為新的key,記作newkey。這時,newk...