flink架構師5 CEP 一致性 YARN

2021-10-11 17:50:50 字數 2413 閱讀 3469

我們使用flinkkafkaconumser,並且啟用checkpoint,偏移量會通過checkpoint儲存到state裡

面,並且缺省會寫入到kafka的特殊主體中,也就是__consumer_offset

setcommitoffsetsoncheckpoints 缺省會true,就是把偏移量寫入特殊主題中

flink自動重啟的過程中,讀取的偏移量是state中的偏移量,如果state裡面沒有那麼從

__consumer_offset裡讀取偏移量,如果__consumer_offset裡面沒有那麼就會從earliest或者lastest讀取資料

redis通過冪等性實現僅一次語義

4.1.5 寫kafka保證exactly once

兩階段提交

flink的兩階段提交

核心原始碼

通過冪等性實現僅一次語義

在分布式系統中,可以使用兩階段提交來實現事務性從而保證資料的一致性,兩階段提交分為:預提交階段與

提交階段,通常包含兩個角色:協調者與執行者,協調者用於用於管理所有執行者的操作,執行者用於執行具

體的提交操作,具體的操作流程:

首先協調者會送預提交(pre-commit)命令有的執行者

執行者執行預提交操作然後傳送一條反饋(ack)訊息給協調者

待協調者收到所有執行者的成功反饋,則傳送一條提交資訊(commit)給執行者

執行者執行提交操作

如果在流程2中部分預提交失敗,那麼協調者就會收到一條失敗的反饋,則會傳送一條rollback訊息給所有執

行者,執行回滾操作,保證資料一致性;但是如果在流程4中,出現部分提交成功部分提交失敗,那麼就會造

成資料的不一致,因此後面也提出了3pc或者通過其他補償機制來保證資料最終一致性

flink中兩階段提交是為了保證端到端的exactly once,主要依託checkpoint機制來實現,先看一下

checkpoint的整體流程,

1.jobmanager會週期性的傳送執行checkpoint命令(start checkpoint);

2.當source端收到執行指令後會產生一條barrier訊息插入到input訊息佇列中,當處理到barrier時

會執行本地checkpoint, 並且會將barrier傳送到下乙個節點,當checkpoint完成之後會傳送一條ack信

息給jobmanager;

當所有節點都完成checkpoint之後,jobmanager會收到來自所有節點的ack資訊,那麼就表示一次

完整的checkpoint的完成;

jobmanager會給所有節點傳送一條callback資訊,表示通知checkpoint完成訊息。接下來就可以

提交事務了

對比flink整個checkpoint機制呼叫流程可以發現與2pc非常相似,jobmanager相當於協調者,flink提

供了checkpointedfunction與checkpointlistener這樣兩個介面,checkpointedfunction中有

snapshotstate方法,每次checkpoint觸發執行方法,通常會將快取資料放入狀態中,可以理解為是乙個

hook,這個方法裡面可以實現預提交,checkpointlistener中有notifycheckpointcomplete方法,

checkpoint完成之後的通知方法,這裡可以做一些額外的操作,比如真正提交kafka的事務;在2pc中提到

如果對應流程2預提交失敗,那麼本次checkpoint就被取消不會執行,不會影響資料一致性.如果流程4失

敗,那麼重啟從上一次的checkpoints重新計算。

第一種【yarn-session.sh(開闢資源)+flink run(提交任務)】

啟動乙個一直執行的flink集群

/bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]

•執行任務

•./bin/flink run wordcount.jar --hostname *** --port 8888

停止任務 【web介面或者命令列執行cancel命令】

第二種【flink run -m yarn-cluster(開闢資源+提交任務)】

• 啟動集群,執行任務

•./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024

./examples/batch/wordcount.jar

注意:client端必須要設定yarn_conf_dir或者hadoop_conf_dir或者hadoop_home環境變數,通過這

個環境變數來讀取yarn和hdfs的配置資訊,否則啟動會失敗

基於袋鼠雲

Flink 狀態一致性

當在分布式系統中引入狀態時,自然也引入了一致性問題。一致性實際上是 正確性級別 的另一種說法,也就是說在成功處理故障並恢復之後得到的結果,與沒有發生任何故障時得到的結果相比,前者到底有多正確?舉例來說,假設要對最近一小時登入的使用者計數。在系統經歷故障之後,計數結果是多少?如果有偏差,是有漏掉的計數...

flink 狀態一致性(十三)

狀態一致性1.有狀態的流處理,內部每個運算元任務都可以有自己的狀態 2.對於流處理內部來說,所謂的狀態一致性就是我們所說的計算結果要保證準確 3.一條資料不丟失,也不重複計算 4.在遇到故障時可以恢復狀態,恢復以後的重新計算,結果應該也是完成正確的狀態一致性分類 1.exactly once 恰好處...

架構師之路 3 session一致性架構設計實踐

一 緣起 什麼是session?伺服器為每個使用者建立乙個會話,儲存使用者的相關資訊,以便多次請求能夠定位到同乙個上下文。web開發中,web server 可以自動為同乙個瀏覽器的訪問使用者自動建立 session 提供資料儲存功能。最常見的,會把使用者的登入資訊 使用者資訊儲存在 session...