聊一聊Flink 寫 Hive 的小檔案問題

2021-10-21 20:20:07 字數 3039 閱讀 9060

flink 1.11 引入了寫hive的功能後,已經在上線了一段時間。下面就聊聊我自己對flink 寫hive 小檔案的一些問題和看法。

1.flink 寫 hive 可能會產生小檔案嗎?

簡單的說,答案是會。

簡單總結下flink 讀kafka寫hive的流程

1.flink 將kafka資料根據設定的分割槽策略,實時寫入對應分割槽hdfs 目錄的臨時檔案  inprogress,如下圖所示。

在inprogress檔案的資料,通過hive是無法查詢到的。

2.打checkpoint時,將inprogress檔案的資料刷到正式檔案中,並提交kafka offset。

這裡有乙個關鍵的點:flink中乙個寫hive的並行度,同一時間只能寫乙個hdfs檔案

那麼就有乙個問題,如果我設定3個並行度,乙個checkpoint週期是不是會生成3個檔案呢?

這裡有乙個關鍵的引數:'sink.shuffle-by-partition.enable'

以上圖hive表為例,一級分割槽為年月日,二級分割槽為小時,按照資料的event時間分割槽。

設定3個並行度,如下圖所示

我們看到,3個並行度都有資料在寫,那麼hdfs上對應就有3個inprogress檔案,checkpoint後會有對應的3個hdfs檔案。(1.12的小檔案合併功能下面再討論)

我們可以看到,即使設定了3個並行度,也只有乙個並行度有資料在寫。這樣的話,乙個checkpoint週期中,只會生成乙個hdfs檔案。

但是這裡有乙個前提,當前所有資料都落在統一分割槽中,即當前小時,也就是說沒有跨小時的延遲。

如果正好是跨小時的時候,由於我們使用的是eventtime 作為分割槽,例如10:00:01s時,既有10點的資料,也有9點的資料,那麼就會有多個並行度有資料在寫,這也很容易理解,因為不同分割槽,肯定是不同的hdfs檔案。

通過這個特點,在'sink.shuffle-by-partition.enable'=true的情況下,我們也很容易看出,資料落地hive是否有延遲。

那麼問題又來了。如果考慮到小檔案的問題,什麼情況下需要將'sink.shuffle-by-partition.enable' 設定為false?

我的答案是,當單個分割槽的寫入速度超過單個並行度寫入hive速度極限時,因為如果這時還設定為true,則會永遠反壓,消費的速度跟不上生產的速度。不過好在flink寫hive在1.11.3版本之後,效能還是不錯的。所以大部分情況,建議設定為true。

場景一

任務編號

業務資料量

寫hive單個並行度極限

並行度sink.shuffle-by-partition.enable

110w/s

4w/s

3+flase

23w/s

4w/s

2+true

任務1: 10w資料會被平均分配到3個hdfs檔案中,每個檔案資料行數為3.3w/s*checkpoint週期。

任務2:在資料不跨小時延遲的情況下,3w資料會在1個hdfs檔案中,每個檔案資料行數為3w/s*checkpoint週期。

再通過設定checkpoint週期,我們可以大大減少小檔案產生的概率。

但是如果時間到了凌晨,資料量很少的情況

場景二

任務編號

業務資料量

寫hive單個並行度極限

並行度sink.shuffle-by-partition.enable

11k/s

4w/s

3+flase

21k/s

4w/s

2+true

任務1: 1k資料會被平均分配到3個hdfs檔案中,每個檔案資料行數為0.33k/s*checkpoint週期。

任務2:在資料不跨小時延遲的情況下,1k資料會在1個hdfs檔案中,每個檔案資料行數為1k/s*checkpoint週期。

這種情況下任務1的小檔案會是任務2的3倍。這也是為什麼我建議在效能跟得上時,將sink.shuffle-by-partition.enable設定為true。

關於flink 1.12的小檔案自動合併。

引數很簡單,就是按照設定的檔案大小合併。我想著重說下的是only files in a single checkpoint are compacted 這句話,只有同乙個checkpoint週期的檔案會合併。

所以對於資料量少的凌晨(場景二),任務1的小檔案數量會改善到任務2的水平,但是也無法完全避免小檔案的存在。

為了減少flink 寫hive的小檔案

1.效能滿足的情況下,盡量設定'sink.shuffle-by-partition.enable'=true

2.如果設定了'sink.shuffle-by-partition.enable'=false,建議使用flink 1.12版本的自動合併小檔案功能。

3.設定合理的checkpoint週期,業務允許的情況下,可以加大checkpoint週期,減少生成檔案的數量。

4.可以最大限度降低flink產生小檔案的情況,但是無法完全避免,根據實際情況定期合併小檔案。

附上使用spark3 合併 小檔案的攻略。

聊一聊hive資料傾斜

info基本資訊表 user id name agegender 1henry16男 2jack17男 3anny18女 4candy19女 5kate20女 burke 21frank 22ellen 23ken 24mili 25.score成績表 user id subject id scor...

聊一聊小甜餅

cookies程式設計 cookie是儲存在客戶端的小文字,儲存的位置分為兩種 cookie可能儲存在客戶端瀏覽器的所佔記憶體中,關閉瀏覽器後,cookies就不再存在。cookie也可能儲存在客戶pc機的硬碟上,設定有效時間,超過有效時間後失效。cookie的常見應用 簡化登入 很多 在登入時,可...

聊一聊 Flask 的 jsonify

首先我們來看一段 python from flask import flask,jsonify tasks api v1.0 tasks methods get defget tasks return jsonify if name main true 在這段 裡面,我們看到了今天的主角jsonif...