03 Mapreduce例項 排序

2022-09-08 03:39:09 字數 3891 閱讀 5405

實驗原理

map、reduce任務中shuffle和排序的過程圖如下:

流程分析:

1.map端:

(1)每個輸入分片會讓乙個map任務來處理,預設情況下,以hdfs的乙個塊的大小(預設為64m)為乙個分片,當然我們也可以設定塊的大小。map輸出的結果會暫且放在乙個環形記憶體緩衝區中(該緩衝區的大小預設為100m,由io.sort.mb屬性控制),當該緩衝區快要溢位時(預設為緩衝區大小的80%,由io.sort.spill.percent屬性控制),會在本地檔案系統中建立乙個溢位檔案,將該緩衝區中的資料寫入這個檔案。

(2)在寫入磁碟之前,執行緒首先根據reduce任務的數目將資料劃分為相同數目的分割槽,也就是乙個reduce任務對應乙個分割槽的資料。這樣做是為了避免有些reduce任務分配到大量資料,而有些reduce任務卻分到很少資料,甚至沒有分到資料的尷尬局面。其實分割槽就是對資料進行hash的過程。然後對每個分割槽中的資料進行排序,如果此時設定了combiner,將排序後的結果進行combia操作,這樣做的目的是讓盡可能少的資料寫入到磁碟。

(3)當map任務輸出最後乙個記錄時,可能會有很多的溢位檔案,這時需要將這些檔案合併。合併的過程中會不斷地進行排序和combia操作,目的有兩個:①儘量減少每次寫入磁碟的資料量。②儘量減少下一複製階段網路傳輸的資料量。最後合併成了乙個已分割槽且已排序的檔案。為了減少網路傳輸的資料量,這裡可以將資料壓縮,只要將mapred.compress.map.out設定為true就可以了。

(4)將分割槽中的資料拷貝給相對應的reduce任務。有人可能會問:分割槽中的資料怎麼知道它對應的reduce是哪個呢?其實map任務一直和其父tasktracker保持聯絡,而tasktracker又一直和jobtracker保持心跳。所以jobtracker中儲存了整個集群中的巨集觀資訊。只要reduce任務向jobtracker獲取對應的map輸出位置就ok了哦。

到這裡,map端就分析完了。那到底什麼是shuffle呢?shuffle的中文意思是「洗牌」,如果我們這樣看:乙個map產生的資料,結果通過hash過程分割槽卻分配給了不同的reduce任務,是不是乙個對資料洗牌的過程呢?

2.reduce端:

(1)reduce會接收到不同map任務傳來的資料,並且每個map傳來的資料都是有序的。如果reduce端接受的資料量相當小,則直接儲存在記憶體中(緩衝區大小由mapred.job.shuffle.input.buffer.percent屬性控制,表示用作此用途的堆空間的百分比),如果資料量超過了該緩衝區大小的一定比例(由mapred.job.shuffle.merge.percent決定),則對資料合併後溢寫到磁碟中。

(2)隨著溢寫檔案的增多,後台執行緒會將它們合併成乙個更大的有序的檔案,這樣做是為了給後面的合併節省時間。其實不管在map端還是reduce端,mapreduce都是反覆地執行排序,合併操作,現在終於明白了有些人為什麼會說:排序是hadoop的靈魂。

(3)合併的過程中會產生許多的中間檔案(寫入磁碟了),但mapreduce會讓寫入磁碟的資料盡可能地少,並且最後一次合併的結果並沒有寫入磁碟,而是直接輸入到reduce函式。

熟悉mapreduce的人都知道:排序是mapreduce的天然特性!在資料達到reducer之前,mapreduce框架已經對這些資料按鍵排序了。但是在使用之前,首先需要了解它的預設排序規則。它是按照key值進行排序的,如果key為封裝的int為intwritable型別,那麼mapreduce按照數字大小對key排序,如果key為封裝string的text型別,那麼mapreduce將按照資料字典順序對字元排序。

了解了這個細節,我們就知道應該使用封裝int的intwritable型資料結構了,也就是在map這裡,將讀入的資料中要排序的字段轉化為intwritable型,然後作為key值輸出(不排序的字段作為value)。reduce階段拿到之後,將輸入的key作為的輸出key,並根據value-list中的元素的個數決定輸出的次數。

實驗步驟

1.在linux中開啟hadoop

start-all.sh  

2.在linux本地新建/data/mapreduce3目錄。

mkdir -p /data/mapreduce3

unzip hadoop2lib.zip

4.在hdfs上新建/mymapreduce3/in目錄,然後將linux本地/data/mapreduce3目錄下的goods_click檔案匯入到hdfs的/mymapreduce3/in目錄中。

hadoop fs -mkdir -p /mymapreduce3/in 

hadoop fs -put /data/mapreduce3/goods_click /mymapreduce3/in

注意:檔案需要注意檔案格式,資料後有隱藏的空格會導致api中讀取失敗,行末尾的空格應該取消掉,中間使用逗號分隔開

5.在idea中編寫**

package

mapreduce3;

import

j**a.io.ioexception;

import

org.apache.hadoop.conf.configuration;

import

org.apache.hadoop.fs.path;

import

org.apache.hadoop.io.intwritable;

import

org.apache.hadoop.io.text;

import

org.apache.hadoop.mapreduce.job;

import

import

org.apache.hadoop.mapreduce.reducer;

import

org.apache.hadoop.mapreduce.lib.input.fileinputformat;

import

org.apache.hadoop.mapreduce.lib.input.textinputformat;

import

org.apache.hadoop.mapreduce.lib.output.fileoutputformat;

import

org.apache.hadoop.mapreduce.lib.output.textoutputformat;

public

class

onesort

}public

static

class reduce extends reducer< intwritable, text, intwritable, text>}}

public

static

void main(string args) throws

ioexception, classnotfoundexception, interruptedexception

}

6.建立resources資料夾,其中建立log4j.properties檔案

hadoop.root.logger=debug, console

log4j.rootlogger =debug, console

7.匯入hadoop2lib的包

8.執行結果

ps:記得修改許可權

MapReduce計數例項

mapreduce 是hadoop的分布式計算系統,是乙個分布式運算程式的程式設計框架。為什麼需要mapreduce 本例項是基於hadoop2.8.5的偽分布式平台。如果jps命令執行後如下圖所示,則偽分布式搭建完成。hadoop偽分布式和完全分布式的搭建後續會進行更新?先將本地乙個檔案上傳到hd...

mapreduce應用例項

1 mapreduce是乙個程式設計模型,既不是平台也不是特定的語言。面向記錄的資料處理 鍵和值 便於跨多個節點分配任務 2 集群上的資源管理取決於版本 1 mapreduce v1 mrv1,經典mapreduce 使用jobtracker和tasktracker 架構 守護程序啟動和管理map任...

MapReduce例項 多表關聯

多表關聯和單錶關聯類似,它也是通過對原始資料進行一定的處理,從其中挖掘出關心的資訊。輸入是兩個檔案,乙個代表工廠表,包含工廠名列和位址編號列 另乙個代表位址表,包含位址名列和位址編號列。要求從輸入資料中找出工廠名和位址名的對應關係,輸出 工廠名 位址名 表。1.源資料 factory factory...