RocketMQ解決冪等性問題

2022-09-02 22:54:32 字數 2995 閱讀 5601

一.造成重複消費的原因

在於回饋機制。正常情況下,消費者在消費訊息時候,消費完畢後,會傳送乙個ack確認資訊給訊息佇列(broker),訊息佇列(broker)就知道該訊息被消費了,就會將該訊息從訊息佇列中刪除。

不同的訊息佇列傳送的確認資訊形式不同,例如rabbitmq是傳送乙個ack確認訊息,rocketmq是返回乙個consume_success成功標誌,kafka實際上有個offset的概念。

造成重複消費的原因?,就是因為網路原因閃斷,ack返回失敗等等故障,確認資訊沒有傳送到訊息佇列,導致訊息佇列不知道自己已經消費過該訊息了,再次將該訊息分發給其他的消費者。(因為訊息重試等機制的原因,如果乙個consumer斷了,rocketmq有consumer集群,會將該訊息重新發給其他consumer)

這個問題針對業務場景來答,分以下三種情況:

(1)比如,你拿到這個訊息做資料庫的insert操作,那就容易了,給這個訊息做乙個唯一的主鍵,那麼就算出現重複消費的情況,就會導致主鍵衝突,避免資料庫出現髒資料。

(2)再比如,你拿到這個訊息做redis的set的操作,那就容易了,不用解決,因為你無論set幾次結果都是一樣的,set操作本來就算冪等操作。

(3)如果上面兩種情況還不行,上大招。準備乙個第三方介質,來做消費記錄。以redis為例,給訊息分配乙個全域性id,只要消費過該訊息,將以k-v形式寫入redis.那消費者開始消費前,先去redis中查詢有沒有消費記錄即可。

二.單機環境解決方案

生產者:傳送訊息同時set乙個key做唯一標識

public

static

void main(string args) throws

mqclientexception

} catch

(exception e)

producer.shutdown();

}

消費者:

//

儲存標識的集合

static

private maplogmap = new hashmap<>();

public

static

void main(string args) throws

mqclientexception

msgid =msg.getmsgid();

system.out.println("key:" + key + ",msgid:" + msgid + "---" + new

string(msg.getbody()));

//模擬異常

int i = 1 / 0;}}

catch

(exception e)

finally

return

consumeconcurrentlystatus.consume_success;

}});

consumer.start();

system.out.println("consumer started.");

}

執行效果:

三.集群環境解決方案

在生產者端要保證冪等性的話,大概可以使用以下兩種方式:

① rocketmq支援訊息查詢的功能,只要去rocketmq查詢一下是否已經傳送過該條訊息就可以了,不存在則傳送,存在則不傳送

② 引入redis,在傳送訊息到rocketmq成功之後,向redis中插入一條資料,如果發生重試,則先去redis中查詢一下該條訊息是否已經傳送過了,存在的話就不重**送訊息了

生產者的這兩種冪等性方案都可以實現,但是都存在一定的缺陷

方案①,rocketmq訊息查詢的效能不是特別好,如果是在高併發的場景下,每條訊息在傳送到rocketmq時都去查詢一下,可能會影響介面的效能

方案②,在一些極端的場景下,redis也無法保證訊息傳送成功之後,就一定能寫入redis成功,比如寫入訊息成功而redis此時宕機,那麼再次查詢redis判斷訊息是否已經傳送過,是無法得到正確結果的

既然在消費者做冪等性的方案都不是特別靠譜,那就再在消費者端來做吧

訊息的消費,最後都對應的是資料庫的操作,只要在訊息消費的時候,判斷一下資料庫中是否已經消費過了這條訊息,就可以保證冪等性了,例如使用唯一索引,保證一條訊息只被消費一次。

參考:去重原則:1.冪等性 2.業務去重

冪等性:(處理必須唯一) 無論這個業務請求被(consumer)執行多少次,我們的資料庫的結果都是唯一的,不可變的。

去重策略:去重表機制,業務拼接去重策略(比如唯一流水號)

1.建立乙個訊息表,拿到這個訊息做資料庫的insert操作。給這個訊息做乙個唯一主鍵(primary key)或者唯一約束,那麼就算出現重複消費的情況,就會導致主鍵衝突。

高併發下去重:採用redis去重(key天然支援原子性並要求不可重複),但是由於不在乙個事務,要求有適當的補償策略,但是對於很重要的業務,不應該支援補償

2.利用redis事務,主鍵(我們必須把全量的運算元據都存放在redis裡,然後定時去和資料庫做資料同步)—-即消費處理後,該處理本來應該儲存在資料庫的,先儲存在redis,再通過一定業務方式從redis中取資料進行db持久化

3.利用redis和關係型資料庫一起做去重機制

4.拿到這個訊息做redis的set的操作.redis就是天然冪等性 

5.準備乙個第三方介質,來做消費記錄。以redis為例,給訊息分配乙個全域性id,只要消費過該訊息,將 < id,message>以k-v形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。

訊息重複消費是乙個非常常見的問題,在很多系統呼叫頻繁的場景下,都可能會出現超時重試的情況,還有就是系統頻繁迭代,經常重啟系統更新的場景,也會出現訊息重複消費

生產者端傳送重複的訊息到rocketmq中其實問題不大,訊息只是在rocketmq中重複了,並沒有影響到系統的資料,我們只需要在最後修改資料庫的時候,保證好冪等性即可

冪等性問題和解決方法

在實際的開發專案中,乙個對外暴露的介面往往會面臨很多次請求。這就需要考慮到乙個冪等性問題。冪等性的概念是 任意多次執行所產生的影響均與一次執行的影響相同,即無論你請求了多少次,對資料庫的影響都只能有一次,不能重複處理。所以,按照上面的理解,每次執行的結果都會發生變化,就是非冪等的。如下面三條sql,...

MQ的冪等性問題

冪等性問題 1 生產者已把訊息傳送到mq,在mq給生產者返回ack的時候網路中斷,故生產者未收到確定資訊,生產者認為訊息未傳送成功,但實際情況是,mq已成功接收到了訊息,在網路重連後,生產者會重新傳送剛才的訊息,造成mq接收了重複的訊息 2 消費者在消費mq中的訊息時,mq已把訊息傳送給消費者,消費...

RabbitMQ 訊息冪等性問題

關於mq消費者的冪等性問題,在於mq的重試機制,因為網路原因或客戶端延遲消費導致重複消費。使用mq重試機制需要注意的事項以及如何解決消費者冪等性問題以下將逐一講解。1.rabbitmq自動重試機制 消費者在消費訊息的時候,如果消費者業務邏輯出現程式異常,這個時候我們如何處理?使用重試機制,rabbi...