Kafka 事務處理

2021-10-03 21:35:31 字數 4696 閱讀 4269

目錄

kafka中的冪等與事務

kafka中的事務實現

在說kafka的事務之前,先要說一下kafka中冪等的實現。冪等和事務是kafka 0.11.0.0版本引入的兩個特性,以此來實現eos(exactly once semantics,精確一次處理語義)。

冪等,簡單地說就是對介面的多次呼叫所產生的結果和呼叫一次是一致的。生產者在進行重試的時候有可能會重複寫入訊息,而使用kafka的冪等性功能之後就可以避免這種情況。開啟冪等性功能的方式很簡單,只需要顯式地將生產者客戶端引數enable.idempotence設定為true即可(這個引數的預設值為false)。

kafka是如何具體實現冪等的呢?kafka為此引入了producer id(以下簡稱pid)和序列號(sequence number)這兩個概念。每個新的生產者例項在初始化的時候都會被分配乙個pid,這個pid對使用者而言是完全透明的。對於每個pid,訊息傳送到的每乙個分割槽都有對應的序列號,這些序列號從0開始單調遞增。生產者每傳送一條訊息就會將對應的序列號的值加1。broker端會在記憶體中為每一對維護乙個序列號。對於收到的每一條訊息,只有當它的序列號的值(sn_new)比broker端中維護的對應的序列號的值(sn_old)大1(即sn_new = sn_old + 1)時,broker才會接收它。如果sn_new< sn_old + 1,那麼說明訊息被重複寫入,broker可以直接將其丟棄。如果sn_new> sn_old + 1,那麼說明中間有資料尚未寫入,出現了亂序,暗示可能有訊息丟失,這個異常是乙個嚴重的異常。引入序列號來實現冪等也只是針對每一對而言的,也就是說,kafka的冪等只能保證單個生產者會話(session)中單分割槽的冪等。冪等性不能跨多個分割槽運作,而事務可以彌補這個缺陷。

事務可以保證對多個分割槽寫入操作的原子性。操作的原子性是指多個操作要麼全部成功,要麼全部失敗,不存在部分成功、部分失敗的可能。為了使用事務,應用程式必須提供唯一的transactionalid,這個transactionalid通過客戶端引數transactional.id來顯式設定。事務要求生產者開啟冪等特性,因此通過將transactional.id引數設定為非空從而開啟事務特性的同時需要將enable.idempotence設定為true(如果未顯式設定,則kafkaproducer缺省會將它的值設定為true),如果使用者顯式地將enable.idempotence設定為false,則會報出configexception的異常。transactionalid與pid一一對應,兩者之間所不同的是transactionalid由使用者顯式設定,而pid是由kafka內部分配的。

另外,為了保證新的生產者啟動後具有相同transactionalid的舊生產者能夠立即失效,每個生產者通過transactionalid獲取pid的同時,還會獲取乙個單調遞增的producer epoch。如果使用同乙個transactionalid開啟兩個生產者,那麼前乙個開啟的生產者會報錯。從生產者的角度分析,通過事務,kafka可以保證跨生產者會話的訊息冪等傳送,以及跨生產者會話的事務恢復。前者表示具有相同transactionalid的新生產者例項被建立且工作的時候,舊的且擁有相同transactionalid的生產者例項將不再工作。後者指當某個生產者例項宕機後,新的生產者例項可以保證任何未完成的舊事務要麼被提交(commit),要麼被中止(abort),如此可以使新的生產者例項從乙個正常的狀態開始工作。

kafkaproducer提供了5個與事務相關的方法,詳細如下:

void inittransactions();

void begintransaction() throws producerfencedexception;

void sendoffsetstotransaction(mapoffsets, string consumergroupid)throws producerfencedexception;

void committransaction() throws producerfencedexception;

void aborttransaction() throws producerfencedexception;

在消費端有乙個引數isolation.level,與事務有著莫大的關聯,這個引數的預設值為「read_uncommitted」,意思是說消費端應用可以看到(消費到)未提交的事務,當然對於已提交的事務也是可見的。這個引數還可以設定為「read_committed」,表示消費端應用不可以看到尚未提交的事務內的訊息。舉個例子,如果生產者開啟事務並向某個分割槽值傳送3條訊息msg1、msg2和msg3,在執行committransaction()或aborttransaction()方法前,設定為「read_committed」的消費端應用是消費不到這些訊息的,不過在kafkaconsumer內部會快取這些訊息,直到生產者執行committransaction()方法之後它才能將這些訊息推送給消費端應用。反之,如果生產者執行了aborttransaction()方法,那麼kafkaconsumer會將這些快取的訊息丟棄而不推送給消費端應用。

kafka 的事務處理,主要是允許應用可以把消費和生產的 batch 處理(涉及多個 partition)在乙個原子單元內完成,操作要麼全部完成、要麼全部失敗。為了實現這種機制,我們需要應用能提供乙個唯一 id,即使故障恢復後也不會改變,這個 id 就是 transactionnalid(也叫 txn.id,後面會詳細講述),txn.id 可以跟內部的 pid 1:1 分配,它們不同的是 txn.id 是使用者提供的,而 pid 是 producer 內部自動生成的(並且故障恢復後這個 pid 會變化),有了 txn.id 這個機制,就可以實現多 partition、跨會話的 eos 語義。

當使用者使用 kafka 的事務性時,kafka 可以做到的保證:

上面是從 producer 的角度來看,那麼如果從 consumer 角度呢?consumer 端很難保證乙個已經 commit 的事務的所有 msg 都會被消費,有以下幾個原因:

簡單總結一下,關於 kafka 事務性語義提供的保證主要以下三個:

kafka 事務性的使用方法也非常簡單,使用者只需要在 producer 的配置中配置transactional.id,通過inittransactions()初始化事務狀態資訊,再通過begintransaction()標識乙個事務的開始,然後通過committransaction()aborttransaction()對事務進行 commit 或 abort,示例如下所示:

properties props = new properties();

props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");

props.put("client.id", "producertranscationnalexample");

props.put("bootstrap.servers", "localhost:9092");

props.put("transactional.id", "test-transactional");

props.put("acks", "all");

kafkaproducer producer = new kafkaproducer(props);

producer.inittransactions();

try catch (producerfencedexception e1) catch (kafkaexception e2)

producer.close();

事務性要解決的問題

再來分析一下,kafka 提供的事務性是如何解決上面兩個問題的:

事務性實現的關鍵

對於 kafka 的事務性實現,最關鍵的就是其事務操作原子性的實現。對於乙個事務操作而言,其會涉及到多個 topic-partition 資料的寫入,如果是乙個 long transaction 操作,可能會涉及到非常多的資料,如何才能保證這個事務操作的原子性(要麼全部完成,要麼全部失敗)呢?

關於這點,最容易想到的應該是引用 2pc 協議(它主要是解決分布式系統資料一致性的問題)中協調者的角色,它的作用是統計所有參與者的投票結果,如果大家一致認為可以 commit,那麼就執行 commit,否則執行 abort:

有了上面的機制,是不是就可以了?很容易想到的問題就是 transactioncoordinator 掛的話怎麼辦?transactioncoordinator 如何實現高可用?

有了上面的機制,就夠了麼?我們再來考慮一種情況,我們期望乙個 producer 在 fail 恢復後能主動 abort 上次未完成的事務(接上之前未完成的事務),然後重新開始乙個事務,這種情況應該怎麼辦?之前冪等性引入的 pid 是無法解決這個問題的,因為每次 producer 在重啟時,pid 都會更新為乙個新值:

再來考慮乙個問題,在具體的實現時,我們應該如何標識乙個事務操作的開始、進行、完成的狀態?正常來說,乙個事務操作是由很多操作組成的乙個操作單元,對於 transactioncoordinator 而言,是需要準確知道當前的事務操作處於哪個階段,這樣在容錯恢復時,新選舉的 transactioncoordinator 才能恢復之前的狀態:

PB事務處理

1 資料視窗更新,只要dberror有錯誤,而事先沒有做過任何commit工作,那麼rollback可以回滾到上次commit位置,即上次commit後所有的資料將被回滾。2 如果是直接寫入sql語句,只要資料庫出現錯誤,那麼rollback可以回滾到上次commit的位置,即上次commit後所有...

MySQL事務處理

start transaction,commit和rollback語法 start transaction begin work commit work and no chain no release rollback work and no chain no release set autocom...

ASP事務處理

asp事務處理。測試資料庫為sql server,伺服器為本機,資料庫名為test,表名為a,兩個欄位id int 主鍵標識,num int set conn server.createobject adodb.connection strconn provider sqloledb.1 persi...