Hive RCFile合併作業產生重複資料問題

2021-06-14 22:28:32 字數 3043 閱讀 5272

前幾天有dw使用者反饋,在往一張表(rcfile表)中用「insert overwrite table partition(xx) select ...」 插入資料的時候,會產生重複檔案。看了下這個作業log,發現map task 000005起了兩個task attempt ,第二個attempt是推測執行,並且這兩個attemp都在task close函式裡面重新命名temp檔案成正式檔案,而不是通過mapreduce框架的兩階段提交協議(two phrase commit protocol)在收到tasktracker發過來的committaskaction時再commit task來保證只有乙個attemp的結果成為正式結果。

task log中的輸出如下:


renamed path hdfs: to hdfs: . file size is 666922


renamed path hdfs: to hdfs: . file size is 666922

public void close() throws ioexception 


outwriter = null;

if (!exception)

} else



if (rj != null)


jobid = rj.getid().tostring();

}} catch (exception e)



public static void jobclose(string outputpath, boolean success, jobconf job,

loghelper console, dynamicpartitionctx dynpartctx) throws hiveexception, ioexception



public static void mvfiletofinalpath(string specpath, configuration hconf,

boolean success, log log, dynamicpartitionctx dpctx, filesinkdesc conf) throws ioexception,


// step3: move to the file destination"moving tmp dir: " + intermediatepath + " to: " + finalpath);

utilities.renameormovefiles(fs, intermediatepath, finalpath);

}} else

fs.delete(tasktmppath, true);


removetemporduplicatefiles(filesystem fs, path path, dynamicpartitionctx dpctx),動態分割槽和非動態分割槽表不同處理邏輯


* remove all temporary files and duplicate (double-committed) files from a given directory.

** @return a list of path names corresponding to should-be-created empty buckets.

*/public static arraylistremovetemporduplicatefiles(filesystem fs, path path,

dynamicpartitionctx dpctx) throws ioexception

arraylistresult = new arraylist();

if (dpctx != null)

}taskidtofile = removetemporduplicatefiles(items, fs);

// if the table is bucketed and enforce bucketing, we should check and generate all buckets

if (dpctx.getnumbuckets() > 0 && taskidtofile != null) }}

}} else

return result;


removetemporduplicatefiles(filestatus items, filesystem fs),對於每個目錄下相同taskid不同attemptid的檔案進行去重

public static hashmapremovetemporduplicatefiles(filestatus items,

filesystem fs) throws ioexception

hashmaptaskidtofile = new hashmap();

for (filestatus one : items)

} else else else

long len1 = todelete.getlen();

long len2 = taskidtofile.get(taskid).getlen();

if (!fs.delete(todelete.getpath(), true)) else }}

}return taskidtofile;


作業 合併果子 fruit

題目描述 在乙個果園裡,多多已經將所有的果子打了下來,而且按果子的不同種類分成了不同的堆。多多決定把所有的果子合成一堆。每一次合併,多多可以把兩堆果子合併到一起,消耗的體力等於兩堆果子的重量之和。可以看出,所有的果子經過n 1次合併之後,就只剩下一堆了。多多在合併果子時總共消耗的體力等於每次合併所耗...

C 作業6 陣列合併 矩陣求和

專案2 陣列合併 已知有兩個有序的陣列a,b,將這兩個陣列合併到陣列c中,陣列c依然有序,如a 5 b 5 則c 10 include using namespace std int main b 5 int c 10 i,j,t for i 0 i 5 i c i a i for i 5,j 0 ...

C 第六次作業 陣列合併,矩陣求和

一 問題及 專案二 陣列合併 檔名稱 ra.cpp 作 者 湯永泰 完成日期 2017 年 5 月 10 日 版 本 號 v1.0 對任務及求解方法的描述部分 輸入描述 無 問題描述 第六次作業 專案二 陣列合併 程式輸出 略 問題分析 略 演算法設計 略 includeusing namespac...