MapReduce讀取資料

2021-09-02 03:38:53 字數 3723 閱讀 1455

下圖列出了涉及mapreduce讀取資料的幾個核心類以及常見的幾種擴充套件。

如圖所示,inputformat類抽象了兩個方法,建立分片的getsplit( )和建立資料讀取工具的createrecordreader( ),可以擴充套件inputformat重寫這兩個方法來實現不同資料來源讀取資料,或者採用不同的方式讀取資料。

inputsplit表示資料的邏輯分片,最常見的是用於表示文字檔案分片的filesplit類,該類擴充套件了inputspit,包含了檔案的路徑、分片起始位置在原始檔中的位元組偏移量、分片的位元組長度以及分片所屬檔案塊存在資料節點資訊。用於分片資訊在客戶端提交作業時會被序列化到檔案然後提交,並且在作業執行中會被反序列化,所以filesplit還實現了writable介面,實現了write(dataoutput out)和readfields(datainput in)兩個方法。

正真為mapreduce提供資料輸入的是recordreader類,給它分配乙個分片,它就從資料來源中讀取分片指定的資料段、並將資料組織成指定的資料結構。

hadoop預設提供的一些資料讀取類基本可以滿足多數需求,特殊情況下我們也可以自己擴充套件。以下用乙個簡單了例子介紹擴充套件方式。

擴充套件的目的是從多個不同型別的資料庫(mysql、oracle、db2等)、或者多張表讀取資料。**如下:

public class multitableinputsplit extends dbinputsplit implements writable

public multitableinputsplit(long start, long end, string intputsql,

string dbconurl, string username, string password, string dbtype)

@override

public void write(dataoutput output) throws ioexception

@override

public void readfields(datainput input) throws ioexception

//get、set等方法省略

}

multiinputsplit 類間接擴充套件了inputsplit類,新增了資料庫連線資訊和查詢資料所使用的sql語句。

public class multitableinputformat extends inputformat

return splits;

} @override

public recordreadercreaterecordreader(

inputsplit split, taskattemptcontext context) throws ioexception,

interruptedexception catch (sqlexception ex)

} /**

* 可以根據表的數量、表的大小控制分片的數量

*/private int getsplitcount(string sqlinfo)

/*** 計算分片的大小

*/private int getsplitsize(string sqlinfo)

public void getsplit(string inputquery, jobcontext job, listsplits) else

splits.add(split);

} }}

class multitablerecordreader extends recordreader

@override

public boolean nextkeyvalue() throws ioexception

if (value == null)

if (null == this.results)

if (!results.next())

key.set(pos + split.getstart());

value.readfields(results);

pos++;

} catch (sqlexception e)

return true; }

@override

public longwritable getcurrentkey()

@override

public mapdbwritable getcurrentvalue()

@override

public float getprogress() throws ioexception

@override

public void initialize(inputsplit split, taskattemptcontext context)

throws ioexception, interruptedexception

/*** 根據不同的資料庫型別實現不同的分頁查詢

*/protected string getselectquery() if(dbtype.equalsignorecase("oracle")) else

}} catch (ioexception ex)

return query.tostring();

} public void initconnection() throws sqlexception

@override

public void close() throws ioexception

if (null != statement)

if (null != connection)

} catch (sqlexception e)

} protected resultset executequery(string query) throws sqlexception

public connection getconnection()

public dbinputformat.dbinputsplit getsplit()

protected void setstatement(preparedstatement stmt)

}

public class mapdbwritable implements dbwritable

public void readfields(resultset resultset) throws sqlexception

} @override

public string tostring()

return builder.substring(0, builder.length() - 1);

} public void write(preparedstatement preparedstatement) throws sqlexception

public mapgetvalues()

public void setvalues(mapvalues)

public string getcolnames()

public void setcolnames(string colnames)

public mapgetcoltype()

public void setcoltype(mapcoltype)

}

通過擴充套件以上幾個類就可以從多個資料庫、多個表讀取資料了。

mapreduce資料傾斜

前言 資料傾斜是日常大資料查詢中 的乙個bug,遇不到它時你覺得資料傾斜也就是書本部落格上的乙個無病呻吟的偶然案例,但當你遇到它是你就會懊悔當初怎麼不多了解一下這個赫赫有名的事故。當然你和資料傾斜的緣分深淺還是看你公司的業務邏輯和資料量有沒有步入資料傾斜的領地。說明 關於資料傾斜的產生原因我將結合 ...

HBASE 資料操作,MapReduce

前面已經對hbase有了不少了解了,這篇重點在實踐操作。hbase本身是乙個很好的key value的儲存系統,但是也不是萬能的,很多時候還是要看用在什麼情形,怎麼使用。kv之類的資料庫就是要應用在這類快速查詢的應用上,而不是像傳統的sql那樣關聯查詢,分組計算,這些可就不是hbase的長處了。下面...

大資料之Map reduce

大資料問題一般解決方式 利用雜湊函式進行分流來解決記憶體限制或者其他限制的問題。1.雜湊函式又叫雜湊函式,雜湊函式的輸入域可以是非常大的範圍,但是輸出域是固定範圍。假設為s。雜湊函式的性質 1.典型的雜湊函式都擁有無限的輸入值域。2.輸入值相同時 返回值一樣。3.輸入值不同時,返回值可能一樣,也可能...