Flink檢查點問題

2021-09-20 03:46:43 字數 3779 閱讀 5123

解決思路

小結

cannot find meta data file '_metadata'

in directory 'hdfs://nn-ha-service/flink/flink-checkpoints'

首先我們來掃盲一下相關知識點。

官網是這樣定義的:檢查點通過允許恢復狀態和相應的流位置使flink中的狀態容錯,從而為應用程式提供與無故障執行相同的語義。

檢查點的主要目:在意外的作業失敗時提供恢復機制。

checkpoint的生命週期由flink管理,即flink建立,擁有和發布checkpoint - 無需使用者互動。作為一種恢復和定期觸發的方法,checkpoint實現的兩個主要設計目標是:

i)建立輕量級。

ii)盡可能快地恢復。

有兩種方式:

通過配置檔案全域性配置

這種配置方式是全域性的,即預設檢查點路徑。

搭建過集群的同學應該知道,配置檔案一般都在conf目錄下。沒錯,flink也是一樣,我們需要修改的就是 flink/conf/flink-conf.yaml 這個檔案。

在配置檔案裡面將下面這個引數配置上:

state.checkpoints.dir: hdfs:

//nn-

ha-service/flink/flink-checkpoints

通過在作業裡配置

就是在每個作業裡,自定義檢查點路徑,優先順序高於預設檢查點路徑。

如果沒有定義,就走預設檢查點路徑。

streamexecutionenvironment env = streamexecutionenvironment.

getexecutionenvironment()

; env.

setstatebackend

(new

rocksdbstatebackend

("hdfs:///flink/flink-checkpoints/"

);

flink預設是不會啟用檢查點的,這需要我們在**裡面設定。

**如下:

streamexecutionenvironment env = streamexecutionenvironment.

getexecutionenvironment()

;//設定檢查點間隔時間

env.

enablecheckpointing

(60000

);

對,我們只需要設定下檢查點的時間間隔,單位是ms,就說明啟動了了乙個一分鐘完成一次的檢查點策略了。

使用檢查點,即在意外的作業失敗後恢復資料。

這時,我們只需要指定檢查點路徑重啟任務即可。

官網給的示例:

$ bin/flink run -s :checkpointmetadatapath [

:runargs]

checkpointmetadatapath : 這個是檢查點元資料路徑,並不簡單是所配置的檢查點的路徑。筆者遇到的問題關鍵就在於此。

由報出來的錯誤我們不難看出,flink是希望找到_metadata這個元資料檔案,可我並沒有指定此檔案,所以報錯了。

先介紹下我用的啟動命令:

flink run -d -c com.flinkcheckpointtest \

-s hdfs:

//nn-

ha-service/flink/flink-checkpoints/ \

-m yarn-cluster \

-yjm 1024m -ytm 1024m -ynm flinkcheckpointtest \

/home/jar/flink_test.jar

想必這個命令知道flink的同學都很熟悉,不熟悉的可以參考下flink官網(

因此我去檢查點目錄下查詢了一番,終於讓我找到了 _metadata 檔案。

如下圖所示:

我們仔細看下這個目錄:

/flink/flink-checkpoints/a9fcbc1efb8124ae998d0cbea62e7eae/chk-

2836

前面兩層目錄是我們設定的檢查點目錄,後面一串像是id。

最後那個chk-2836應該就是實際檢查點的目錄,2836代表當前作業已經生成過2836次檢查點了。最新的檢查點覆蓋之前的檢查點。

/flink/flink-checkpoints/a9fcbc1efb8124ae998d0cbea62e7eae/ 該目錄下面就乙個chk開頭的目錄,即始終儲存最新的目錄,節省hdfs資源。

中間那一串id經過仔細查閱官網得知是jobid

好,那麼現在主要的問題就是如何尋找對應作業的jobid

有兩種情況:

作業正在執行

這種情況,可以直接去flink的管理介面找到。如下圖:

作業已經停止

這種情況,就需要去看此作業的詳細日誌了。如下圖:

jobmanager.log 日誌裡面多次出現 作業名和 jobid

實際工作中,用到檢查點的時候肯定是因為作業異常中斷,所以大多數情況都是通過第二種方式去尋找作業對應的 jobid

修改後的提交作業命令:

flink run -d -c com.flinkcheckpointtest \

-s hdfs:

//nn-

ha-service/flink/flink-checkpoints/

541eaf031c6f8ed50916342b2baafe98/chk-

13 \

-m yarn-cluster \

-yjm 1024m -ytm 1024m -ynm flinkcheckpointtest \

/home/jar/flink_test.jar

執行了一段時間,作業異常中斷,發現是記憶體溢位錯誤,故適當調節記憶體,最終提交命令如下:

flink run -d -c com.flinkcheckpointtest \

-s hdfs:

//nn-

ha-service/flink/flink-checkpoints/

541eaf031c6f8ed50916342b2baafe98/chk-

13 \

-m yarn-cluster \

-yjm 4096m -ytm 4096m -ynm flinkcheckpointtest \

/home/jar/flink_test.jar

好了,至此,問題圓滿解決。

這裡順便總結下flink作業異常中斷的操作流程。

找出作業對應的jobid

進入hdfs對應目錄,找到目錄下面最新的檢查點目錄

通過指定檢查點目錄的方式重新啟動作業

待作業執行穩定,檢視作業最初異常中斷的原因,記錄下來並總結思考如何解決和避免。

檢查點 為什麼要插入檢查點 檢查點的作用

一 為什麼要插入檢查點 檢查點的作用 檢查點記錄被測系統的預期結果,在執行過程中,qtp將預期結果與實際執行結果進行比較,若一致,測試結果報告中,檢查點為passed,否則為failed。只有插入檢查點的 才具有測試能力,檢查功能點是否實現 二 標準檢查點 standard checkpoint 檢...

功能測試檢查點

測試物件 flight 程式 c s 架構 檢查mercury 是否顯示在 之間 dim a,ba window flight reservation winedit order no 4 getroproperty text b cstr a msgbox b 正規表示式檢查 if語句判定成功或者...

lr VuGen(事務 檢查點)

事務的應用 在每乙個請求前後加上transaction 和 end transaction,選單欄insert new step 記錄請求的響應時間,其中end transaction有status選項,有4個選項,若是auto就不用手工判斷事務的執行結果,但這個結果不夠準確。2 統計事務的成功率 ...