flink 1 11 寫入hive效能低下問題

2021-10-25 06:26:21 字數 900 閱讀 2417

hive streming sink 投入生產,發現寫入效能底下,排查過後,發現是hive在每寫入一條資料,都會判斷檔案大小來決定檔案是否需要滾動,判斷檔案大小使用的hdfs的api,需要訪問namenode,這就是寫入效能底下的根源。截止到flink 1.11.2這個問題任然沒有解決。

這個問題解決,需要自己實現hivebulkwrite***ctory,**如下:

public class hivebulkwrite***ctory implements hadooppathbasedbulkwriter.factory

@override

public hadooppathbasedbulkwritercreate(path targetpath, path inprogresspath) throws ioexception

@override

public void dispose() catch (ioexception ignored)

}@override

public void addelement(rowdata element) throws ioexception

@override

public void flush()

@override

public void finish() throws ioexception

};}}

在hivetablesink#consumedatastream方法中,修改自己定義的hivebulkwrite***ctory。

hivebulkwrite***ctory hadoopbulkfactory = new hivebulkwrite***ctory(recordwrite***ctory);

Flink1 11記憶體模型與引數調整

flink taskmanager啟動日誌 total process memory flink總資源數 2048m,引數 taskmanager.memory.process.size jvm metaspace jvm元空間,引數 taskmanager.memory.jvm metaspace...

flink寫入HDFS中文亂碼

客戶端埋點日誌進行解析時需要獲取地區編碼和名稱,程式是通過flink分布式快取將地區編碼和名稱資料傳到每個task節點進行讀取。本地測試時沒有問題,但是部署到集群資料寫入hdfs後發現中文亂碼,部分 如下 設定分布式快取檔案位址 streamexecutionenvironment bsenv st...

1 11 flink中的動態載入udf jar包

專案中想要把flink做到平台化,只需要編輯sql便能把任務跑起來,開發過程中遇到乙個問題,就是如何能夠自動的載入自定義的函式包,因為專案中已經把main打包成乙個通用的jar,使用時只需要把sql資訊用引數形式傳入就可以.但是如果sql中需要使用到udf,那麼就需要實現flink的動態載入jar ...