Hive RCFile合併作業產生重複資料問題

2021-06-14 22:28:32 字數 3043 閱讀 5272

前幾天有dw使用者反饋,在往一張表(rcfile表)中用「insert overwrite table partition(xx) select ...」 插入資料的時候,會產生重複檔案。看了下這個作業log,發現map task 000005起了兩個task attempt ,第二個attempt是推測執行,並且這兩個attemp都在task close函式裡面重新命名temp檔案成正式檔案,而不是通過mapreduce框架的兩階段提交協議(two phrase commit protocol)在收到tasktracker發過來的committaskaction時再commit task來保證只有乙個attemp的結果成為正式結果。

task log中的輸出如下:

attempt_201304111550_268224_m_000005_0

renamed path hdfs: to hdfs: . file size is 666922

attempt_201304111550_268234_m_000005_1

renamed path hdfs: to hdfs: . file size is 666922

public void close() throws ioexception 

outwriter.close();

outwriter = null;

if (!exception)

} else

}}

finally 

if (rj != null)

hadoopjobexechelper.runningjobkilluris.remove(rj.getjobid());

jobid = rj.getid().tostring();

}} catch (exception e)

}

jobclose方法

public static void jobclose(string outputpath, boolean success, jobconf job,

loghelper console, dynamicpartitionctx dynpartctx) throws hiveexception, ioexception

}

mvfiletofinalpath方法

public static void mvfiletofinalpath(string specpath, configuration hconf,

boolean success, log log, dynamicpartitionctx dpctx, filesinkdesc conf) throws ioexception,

hiveexception

// step3: move to the file destination

log.info("moving tmp dir: " + intermediatepath + " to: " + finalpath);

utilities.renameormovefiles(fs, intermediatepath, finalpath);

}} else

fs.delete(tasktmppath, true);

}

removetemporduplicatefiles(filesystem fs, path path, dynamicpartitionctx dpctx),動態分割槽和非動態分割槽表不同處理邏輯

/**

* remove all temporary files and duplicate (double-committed) files from a given directory.

** @return a list of path names corresponding to should-be-created empty buckets.

*/public static arraylistremovetemporduplicatefiles(filesystem fs, path path,

dynamicpartitionctx dpctx) throws ioexception

arraylistresult = new arraylist();

if (dpctx != null)

}taskidtofile = removetemporduplicatefiles(items, fs);

// if the table is bucketed and enforce bucketing, we should check and generate all buckets

if (dpctx.getnumbuckets() > 0 && taskidtofile != null) }}

}} else

return result;

}

removetemporduplicatefiles(filestatus items, filesystem fs),對於每個目錄下相同taskid不同attemptid的檔案進行去重

public static hashmapremovetemporduplicatefiles(filestatus items,

filesystem fs) throws ioexception

hashmaptaskidtofile = new hashmap();

for (filestatus one : items)

} else else else

long len1 = todelete.getlen();

long len2 = taskidtofile.get(taskid).getlen();

if (!fs.delete(todelete.getpath(), true)) else }}

}return taskidtofile;

}

作業 合併果子 fruit

題目描述 在乙個果園裡,多多已經將所有的果子打了下來,而且按果子的不同種類分成了不同的堆。多多決定把所有的果子合成一堆。每一次合併,多多可以把兩堆果子合併到一起,消耗的體力等於兩堆果子的重量之和。可以看出,所有的果子經過n 1次合併之後,就只剩下一堆了。多多在合併果子時總共消耗的體力等於每次合併所耗...

C 作業6 陣列合併 矩陣求和

專案2 陣列合併 已知有兩個有序的陣列a,b,將這兩個陣列合併到陣列c中,陣列c依然有序,如a 5 b 5 則c 10 include using namespace std int main b 5 int c 10 i,j,t for i 0 i 5 i c i a i for i 5,j 0 ...

C 第六次作業 陣列合併,矩陣求和

一 問題及 專案二 陣列合併 檔名稱 ra.cpp 作 者 湯永泰 完成日期 2017 年 5 月 10 日 版 本 號 v1.0 對任務及求解方法的描述部分 輸入描述 無 問題描述 第六次作業 專案二 陣列合併 程式輸出 略 問題分析 略 演算法設計 略 includeusing namespac...