Kafka的Exactly Once和事務

2021-08-29 03:43:27 字數 2376 閱讀 7349

完整的exactly once時非常難以實現的,可以說時分布式訊息系統的核心問題。

kafka支援兩種事務,單獨的producer事務和接收-處理-傳送事務,不支援單純的consumer事務(說白了就是只有producer提供了事務api)

kafka在0.11.0.0之前的版本中只支援at least once和at most once語義,尚不支援exactly once語義。

但是在很多要求嚴格的場景下,如使用kafka處理交易資料,exactly once語義是必須的。我們可以通過讓下游系統具有冪等性來配合kafka的at least once語義來間接實現exactly once。但是:

在0.11以上版本,kafka stream api實現了exactly once語義。

1. broker儲存訊息後,傳送ack前宕機,producer認為訊息未傳送成功並重試,造成資料重複

2. 前一條訊息傳送失敗,後一條訊息傳送成功,前一條訊息重試後成功,造成資料亂序

單純的producer事務將保證傳送的訊息同時傳送成功或者同時無法傳送

properties props =

newproperties()

; props.

put(

"bootstrap.servers"

,"localhost:9092");

props.

put(

"transactional.id"

,"my-transactional-id");

producer

producer =

newkafkaproducer

<

>

(props,

newstringserializer()

,new

stringserializer()

);producer.

inittransactions()

;try

catch

(producerfencedexception

| outofordersequenceexception | authorizationexception e)

catch

(kafkaexception e)

producer.

close()

;

在上面的例子我們看到,再沒有commit之前,producer已經實際上將訊息傳送到了broker。consumer如果此時取到這些未commmit訊息,將無法處理也無法丟棄,只能快取起來等待broker確認。這顯然是乙個醜陋的設計。

為此,kafka新增了乙個很重要概念,叫做lso,即last stable offset。對於同乙個topicpartition,其offset小於lso的所有transactional message的狀態都已確定,要不就是committed,要不就是aborted。而broker對於read_committed的consumer,只提供offset小於lso的訊息。這樣就避免了consumer收到狀態不確定的訊息,而不得不buffer這些訊息。

另外,consumer會接收到引入了一種特殊型別的訊息,即control message。consumer通過這一類訊息,consumer通過該訊息過濾掉那些被abort的事務的訊息。

下面時kafka中最近經典的接受kafka——處理——傳送到kafka的例子。

producer

producer =

newkafkaproducer

(props)

;// 初始化事務,包括結束該transaction id對應的未完成的事務(如果有)

// 保證新的事務在乙個正確的狀態下啟動

producer.

inittransactions()

;// 開始事務

producer.

begintransaction()

;// 消費資料,這裡的groupid應該和下面producer.sendoffsetstotransaction指定的groupid一致。

consumerrecords

records = consumer.

poll

(100);

trycatch

(producerfencedexception

| outofordersequenceexception | authorizationexception e)

finally

Flink內部精確一次exactly once

flink 中的乙個大的特性就是exactly once的特性,我們在一般的流處理程式中,會有三種處理語義 我們在程式處理中,通常要求程式滿足exactly once,就是確保資料的準確性,不丟失,不重複,但是實現這樣的功能是比較複雜的,在flink中,是如何提供精確一次的特性呢?我覺得應該有兩方面...

Kafka 如何理解Kafka的「快」?

據了解,kafka吞吐量峰值每秒百萬,就算在記憶體個cpu都不高的情況下,最高可達每秒十萬,並且還能做到持久化儲存。kafka如此高吞吐率的原因是什麼?隨機寫每秒幾十幾百k。作業系統從磁碟將資料copy dma copy 到核心空間緩衝區 kernel buffer 應用程式從核心空間緩衝區 ker...

kafka 四 kafka的使用原理

在kafka中,topic是乙個儲存訊息的邏輯概念,可以認為是乙個訊息集合。每條訊息傳送到kafka集群的 訊息都有乙個類別。物理上來說,不同的topic的訊息是分開儲存的,每個topic可以有多個生產者向它傳送訊息,也可以有多個消費者去消費其中的訊息。每個topic可以劃分多個分割槽 每個topi...