HBase BulkLoad批量寫入資料實戰

2021-09-07 19:38:08 字數 4629 閱讀 9327

在進行資料傳輸中,批量載入資料到hbase集群有多種方式,比如通過hbase api進行批量寫入資料、使用sqoop工具批量導數到hbase集群、使用mapreduce批量匯入等。這些方式,在匯入資料的過程中,如果資料量過大,可能耗時會比較嚴重或者占用hbase集群資源較多(如磁碟io、hbase handler數等)。今天這篇部落格筆者將為大家分享使用hbase bulkload的方式來進行海量資料批量寫入到hbase集群。

在使用bulkload之前,我們先來了解一下hbase的儲存機制。hbase儲存資料其底層使用的是hdfs來作為儲存介質,hbase的每一張表對應的hdfs目錄上的乙個資料夾,資料夾名以hbase表進行命名(如果沒有使用命名空間,則預設在default目錄下),在表資料夾下存放在若干個region命名的資料夾,region資料夾中的每個列簇也是用資料夾進行儲存的,每個列簇中儲存就是實際的資料,以hfile的形式存在。路徑格式如下:

/hbase/data/default////
按照hbase儲存資料按照hfile格式儲存在hdfs的原理,使用mapreduce直接生成hfile格式的資料檔案,然後在通過regionserver將hfile資料檔案移動到相應的region上去。流程如下圖所示:

hfile檔案的生成,可以使用mapreduce來進行實現,將資料來源準備好,上傳到hdfs進行儲存,然後在程式中讀取hdfs上的資料來源,進行自定義封裝,組裝rowkey,然後將封裝後的資料在回寫到hdfs上,以hfile的形式儲存到hdfs指定的目錄中。實現**如下:

/*

* * read datasource from hdfs & gemerator hfile.

* * @author smartloli.

* * created by aug 19, 2018 */

public class gemeratorhfile2

}public static void main(string args)

configuration conf =new configuration();

conf.addresource(new path(args[

0]));

conf.set(

"hbase.fs.tmp.dir

", "

partitions_

" +uuid.randomuuid());

string tablename = "

person";

string input = "

hdfs://nna:9000/tmp/person.txt";

string output = "

hdfs://nna:9000/tmp/pres";

system.out.println(

"table :

" +tablename);

htable table;

try catch (ioexception e1)

connection conn =connectionfactory.createconnection(conf);

table =(htable) conn.gettable(tablename.valueof(tablename));

job job =job.getinstance(conf);

job.setjobname(

"generate hfile");

job.setjarbyclass(gemeratorhfile2.class);

job.setinputformatclass(textinputformat.class);

fileinputformat.setinputpaths(job, input);

fileoutputformat.setoutputpath(job, new path(output));

hfileoutputformat2.configureincrementalload(job, table);

try catch (interruptedexception e) catch (classnotfoundexception e)

} catch (exception e) }}

在hdfs目錄/tmp/person.txt中,準備資料來源如下:

1 smartloli 100

2 smartloli2 101

3 smartloli3 102

然後,將上述**編譯打包成jar,上傳到hadoop集群進行執行,執行命令如下:

如果在執行命令的過程中,出現找不到類的異常資訊,可能是本地沒有載入hbase依賴jar包,在當前使用者中配置如下環境變數資訊:

export hadoop_classpath=$hbase_home/lib/*

:classpath

然後,執行source命令使配置的內容立即生生效。

在成功提交任務後,linux控制台會列印執行任務進度,也可以到yarn的資源監控介面檢視執行進度,結果如下所示:

等待任務的執行,執行完成後,在對應hdfs路徑上會生成相應的hfile資料檔案,如下圖所示:

然後,在使用bulkload的方式將生成的hfile檔案匯入到hbase集群中,這裡有2種方式。一種是寫**實現匯入,另一種是使用hbase命令進行匯入。

通過loadincrementalhfiles類來實現匯入,具體**如下:

/*

** use bulkload inport hfile from hdfs to hbase.

* * @author smartloli.

** created by aug 19, 2018

*/public class bulkload2hbase

string output = "

hdfs://cluster1/tmp/pres";

configuration conf =new configuration();

conf.addresource(new path(args[

0]));

htable table = new htable(conf, "

person");

loadincrementalhfiles loader =new loadincrementalhfiles(conf);

loader.dobulkload(new path(output), table);}}

執行上述**,執行結果如下:

先將生成好的hfile檔案遷移到目標集群(即hbase集群所在的hdfs上),然後在使用hbase命令進行匯入,執行命令如下:

# 先使用distcp遷移hfile

hadoop distcp -dmapreduce.job.queuename=queue_1024_01 -update -skipcrccheck -m 10 /tmp/pres hdfs://

nns:9000/tmp/pres

# 使用bulkload方式匯入資料

hbase org.apache.hadoop.hbase.mapreduce.loadincrementalhfiles /tmp/pres person

最後,我們可以到指定的regionserver節點上檢視匯入的日誌資訊,如下所示為匯入成功的日誌資訊:

2018-08-19

16:30:34,969 info [b.defaultrpcserver.handler=7,queue=1,port=16020] regionserver.hstore: successfully loaded store file hdfs://

cluster1/tmp/pres/cf/7b455535f660444695589edf509935e9 into store cf (new location: hdfs:

//cluster1/hbase/data/default/person/2d7483d4abd6d20acdf16533a3fdf18f/cf/d72c8846327d42e2a00780ac2facf95b_seqid_4_)

使用bulkload方式匯入資料後,可以進入到hbase集群,使用hbase shell來檢視資料是否匯入成功,預覽結果如下:

本篇部落格為了演示實戰效果,將生成hfile檔案和使用bulkload方式匯入hfile到hbase集群的步驟進行了分解,實際情況中,可以將這兩個步驟合併為乙個,實現自動化生成與hfile自動匯入。如果在執行的過程**現rpcretryingcaller的異常,可以到對應regionserver節點檢視日誌資訊,這裡面記錄了出現這種異常的詳細原因。

HBase Bulk Load的基本使用

最近在專案中需要將大量存在於文字檔案中的資料批量匯入到hbase表中,並且涉及到將多個文字檔案中的資料合併到同一張hbase表中,而這些文字檔案中的字段並不是完全相同的 因為專案當中查詢資料採用的是phoenix,因此,最初想利用phoenix的bulk csv data loading來實現,但是...

mybatis批量查詢,批量新增,批量更新

一 多條件批量查詢 先上 再講解 select from ifs company where id and code id標籤不用多說,和dao方法一一對應。parametertype標籤寫list就可以,如果是其他型別對應寫就可以。resultmap,自己定義的字段實體類對應。二 批量新增 先上 ...

批量匯入是批量修改還是批量新增

1 一般基礎資料資訊的管理功能包括 新增 修改 刪除 查詢 匯入 匯出,比如物料資訊維護 這裡說到的匯入即相對於新增來說,即批量新增的功能 2 當我所有的資料資訊都完善了的情況下,由於業務的變更,需要給這些基礎資料資訊新增乙個字段資訊a 這個時候怎麼辦?3 注 在新增 修改 匯入模組都增加了字段資訊...