Flink exactly once 實戰筆記

2021-10-04 16:26:20 字數 1496 閱讀 3979

snak1enn關注

0.3532019.05.21 10:59:06字數 1,501閱讀 1,162

眾所周知,flink在很早的時候就通過checkpointing提供了exactly-once的semantic,不過僅限於自身或者是從kafkaconsumer中消費資料。而在flink 1.4版本的時候加入了赫赫有名的twophasecommitsinkfunction,提供了end-to-end的exatcly-once語言,當然是在需要下游支援回滾的情況下,具體的概念和設計方式官網已經寫的比較清楚,就不多加贅述。而對於kafkaproducer,kafka在0.11版本之後支援transaction,也就意味著支援對寫入資料的commit和rollback,在通過flink寫入到kafka的應用程式中可以達到exactly-once的效果。

接下來展示一下如何在flink應用程式中啟用exactly-once語義。對於sourcefunction大家隨意採用一種即可,檔案,kafka topic等皆可。而主要部分是在於對flinkkafkaproducer的初始化。我使用的是flink1.7版本使用的producer類為flinkkafkaproducer011,觀察它的建構函式,很容易發現有的建構函式中需要你傳入乙個列舉變數semantic, 有三種可選值none,at_least_once,exactly_once,而預設值為at_least_once,很顯然我們在這裡需要使用exactly_once。不過在此之前,我們需要仔細閱讀一下flink官網flink-kafka-connector的內容,其中提到,kafka broker的transaction.max.timeout.ms預設為15分鐘,而flinkkafkaproducer011預設的transaction.timeout.ms為1個小時,遠遠超出了broker的最大超時時間,這種情況下如果你的服務掛了超過15分鐘,就會造成資料丟失。所以如果需要你的producer支援的更長的事務時間就需要提高kafka broker transaction.max.timeout.ms的值。下面是乙個簡單的例項去使用exactly-once語義的flinkkafkaproducer。

flinkkafkaproducerproducer = new flinkkafkaproducer<>(

topics,

properties,

flinkkafkaproducer011.semantic.exactly_once

)

這麼做的話flink sink到kafka中在大部分情況下就都能保證exactly-once。值得注意的是,所有通過事務寫入的kafka topic, 在消費他們的時候,必須給消費者加上引數isolation.level=read_committed,這是因為kafka的事務支援是給寫入的資料分為committed和uncomitted,如果使用預設配置的consumer,讀取的時候依然會讀取所有資料而不是根據事務隔離。

Android原始碼設計模式解析與實戰筆記

2.builder模式 適用於 物件初始化複雜 產品類複雜不同呼叫順序產生最終型別不同 方法不同呼叫順序產生最終結果不同。鏈式呼叫直觀。3.原型模式 使用者通過從乙個樣板物件中轉殖出乙個內部屬性一致的物件。跳過 4.工廠方法模式 定義乙個用於建立物件的介面。讓其子類決定例項化哪個物件。複雜的物件時候...

Spring Spring實戰第四版 學習筆記

1.component註解如果沒有指定bean name,預設使用類名但類名的第乙個字母小寫作為bean name.如下面的 生死的bean的名字就是 testbean1,component public class testbean1 但是如何你的bean的命名不規則,則可能出現例外,如下面的 生...

OAuth 2 0實戰課 07 08 筆記

極客時間 oauth 2.0實戰課 07 08 筆記 移動端使用oauth2.0 code challenge method s256即code verifier code challenge base64url encode sha256 ascii code verifier 按照上圖訪問步驟 ...