開發中使用RabbitMQ的注意事項

2021-07-28 04:04:52 字數 3197 閱讀 9807

使用訊息佇列處理訊息的時候,我們可能會遇到以下問題:

訊息處理失敗

訊息體本身有誤

訊息重複處理

訊息丟失

對於訊息處理失敗,有可能有由於網路波動導致的資料處理異常,待網路穩定時訊息就會正常處理,對於這種處理失敗,我們應該繼續嘗試去處理訊息。

訊息體本身有誤,這會導致訊息連續處理失敗,占用較多的資源,寫大量的無用日誌,這種錯誤應該丟棄這部分無用訊息,但要記錄下日誌,記清訊息體本身資料,以及丟棄訊息的原因。

訊息重複處理,例如我們通過訊息佇列向資料庫中新增資料,由於資料庫網路波動,導致資料庫連線超時,而我們的系統認為訊息處理失敗,就會把訊息回滾到訊息佇列,繼續嘗試處理,這時就會造成訊息重複處理的現象,對於重要的訊息,我們可以每處理一條訊息,就記錄一下,處理新的訊息時,進行判斷訊息是否已經處理,如果已經處理,就丟棄訊息。

由於spring 與rabbitmq整合 對訊息的處理方式是預設自動應答,也就是處理訊息時無論是否出現異常,都會給訊息佇列應答處理成功,訊息佇列刪除訊息,這時就會出現訊息丟失的情況,為了解決這個問題,我們需要使用手動應答的方式處理訊息。

rabbitmq消費者***的配置

<

rabbit

:listener

-container connection

-factory

="connectionfactory"

acknowledge

="manual"

>

<

rabbit

:listener queues

="queuename"

ref=

"listiner"

/>

rabbit

:listener

-container

>

acknowledge

="manual" 就表示該***手動應答訊息

消費者***的編寫

為了能夠手動應答訊息,我們編寫的***需要實現channelawaremessagelistener,重寫onmessage()方法,裡面有兩個引數

message 

message

, channel 

channel,message 是訊息體本身,channel是rabbitmq的連線通道。

異常的處理

訊息處理正常,沒有丟擲異常,這時我們需要手動應答訊息

channel

.basicack

(message

.getmessageproperties

().getdeliverytag

(),false

);

當出現異常時,我們需要把這個訊息回滾到訊息佇列,有兩種方式

//ack返回false,並重新回到佇列,api裡面解釋得很清楚

channel

.basicnack

(message

.getmessageproperties

().getdeliverytag

(),false

,true

);

//拒絕訊息

channel

.basicreject

(message

.getmessageproperties

().getdeliverytag

(),true

);

經過開發中的實際測試,當訊息回滾到訊息佇列時,這條訊息不會回到佇列尾部,而是仍是在佇列頭部,這時消費者會立馬又接收到這條訊息,進行處理,接著丟擲異常,進行回滾,如此反覆進行。這種情況會導致訊息佇列處理出現阻塞,訊息堆積,導致正常訊息也無法執行。對於訊息回滾到訊息佇列,我們希望比較理想的方式時出現異常的訊息到達訊息佇列尾部,這樣既保證訊息不會丟失,又保證了正常業務的進行,因此我們採取的解決方案是,將訊息進行應答,這時訊息佇列會刪除該訊息,同時我們再次傳送該訊息到訊息佇列,這時就實現了錯誤訊息進行訊息佇列尾部的方案。

//手動進行應答

channel

.basicack

(message

.getmessageproperties

().getdeliverytag

(),false

);

//重新傳送訊息到隊尾

channel

.basicpublish

(message

.getmessageproperties

().getreceivedexchange

(),

message

.getmessageproperties

().getreceivedroutingkey

(),messageproperties

.persistent_text_plain

,

json

.tojsonbytes

(new

object

()));

對於第三條中的解決方案會存在乙個問題,如果乙個訊息體本身有誤,會導致該訊息體,一直無法進行處理,而伺服器中刷出大量無用日誌。解決這個問題可以採取兩種方案

一種是對於日常細緻處理,分清哪些是可以恢復的異常,哪些是不可以恢復的異常。對於可以恢復的異常我們採取第三條中的解決方案,對於不可以處理的異常,我們採用記錄日誌,直接丟棄該訊息方案。

另一種是我們對每條訊息進行標記,記錄每條訊息的處理次數,當一條訊息,多次處理仍不能成功時,處理次數到達我們設定的值時,我們就 丟棄該訊息,但需要記錄詳細的日誌。

使用手動應答訊息,有一點需要特別注意,那就是不能忘記應答訊息,因為對於rabbitmq來說處理訊息沒有超時,只要不應答訊息,他就會認為仍在正常處理訊息,導致訊息佇列出現阻塞,影響業務執行。

C 開發DLL中使用new和delete注意事項

報錯情況 1,在 dll 中用 new 來建立宿主程式中的物件,然後把這個物件指標儲存到宿主程式,當 dll 被解除安裝後,凡是涉及到這個指標的呼叫都會報錯,包括 delete 這個指標也會有錯。2,在dll中new出乙個物件,然後在不需要使用時進行delete,結果會報如下錯誤 分析原因 因為ne...

delphi中使用override需要注意的地方

在override時,如果override的是procedure,則加上關鍵字inherited 就會執行父類同名procedure的所有過程,然後再執行子類中特有的過程。如果override的是function則不會執行父類中同名function的內容。但是,可以通過下面的方法來執行父類的內容 例...

shell中使用while迴圈ssh的注意事項

需要讀取乙個文字,次文字每一行包含乙個ip在while迴圈中使用ssh,但ssh完第一行後就退出了,如何避免自動讀取一行就跳出while迴圈,此文將詳細解釋其原因。最近在寫乙個自動更新的shell,可是發現如果在使用while迴圈從乙個檔案中讀取ip位址,然後訪問就只能讀取第一行紀錄。如下 whil...