噓!非同步事件這樣用真的好麼?

2022-01-09 23:27:01 字數 2827 閱讀 1785

今年年初的時候寫了一篇文章 《圍觀:基於事件機制的內部解耦之心路歷程》。這篇文章主要講的是用 es 資料異構的場景。程式訂閱 mysql binlog 的變更,然後程式內部使用 spring event 來分發具體的事件,因為乙個表的資料變更可能會需要更新多個 es 索引。

上圖的方案存在乙個問題,就是我們今天文章要聊的內容。

這個問題就是當 mq consumer 收到訊息後,就直接發布 event 了,如果是同步的,沒有問題。如果某個 eventlistener 中處理失敗了,那麼這條訊息將不會 ack。

如果是非同步發布 event 的場景,發布完訊息馬上就 ack 了。就算某個 eventlistener 中處理失敗了,mq 也感知不到,不會進行訊息的重新投遞,這就是存在的問題。

既然訊息已經 ack 了,那就不利用 mq 的重試功能了,使用方自己重試是不是也可以呢?

可肯定是可以的,內部處理是否成功肯定是可以知道的,如果處理失敗了可以預設重試,或者有一定策略的重試。實在不行還可以落庫,儲存記錄。

這樣的問題在於太煩了呀,每個使用的地方都要去做這件事情,而且對於未來接手你**的程式小哥哥來說,這很有可能讓小哥哥頭髮慢慢脫落啊。。。。

脫落不要緊,關鍵他還不知道要做這個處理,說不定哪天就背鍋了,慘兮兮。。。。

要保證訊息和業務處理的一致性,就不能立馬進行 ack 操作。而是要等業務處理完成後再決定是否要 ack。

如果有處理失敗的就不應該 ack,這樣就能復用 mq 的重試機制了。

分析下來,這就是乙個典型的非同步轉同步的場景。像 dubbo 中也有這個場景,所以我們可以借鑑 dubbo 中的實現思路。

建立乙個 defaultfuture 用於同步等待獲取任務執行結果。然後在 mq 消費的地方使用 defaultfuture。

@service

@rocketmqmessagelistener(topic = "$", consumergroup = "$")

public class datachangeconsume implements rocketmqlistener , thread {}", datachangerequest, thread.currentthread().getname());

datachangeevent event = new datachangeevent(this);

event.setchangetype(datachangerequest.getchangetype());

event.settable(datachangerequest.gettable());

event.setmessageid(datachangerequest.getmessageid());

boolean result = defaultfuture.get();

log.info("messageid {} 處理結果 {}", datachangerequest.getmessageid(), result);

if (!result) }}

newfuture() 會傳入事件引數,超時時間,任務數量幾個引數。任務數量是用於判斷所有 eventlistener 是否全部執行完成。

defaultfuture.get(); 這不就會阻塞,等待所有任務執行完成才會返回結果,如果所有業務都處理成功了,那麼會返回 true,流程結束,訊息自動 ack。

如果返回了 false 證明有處理失敗的或者超時的,就不需要 ack 了,丟擲異常等待重試。

public boolean get() 

long start = system.currenttimemillis();

lock.lock();

try

// 全部執行成功

if (isdone())

// 超時

if (system.currenttimemillis() - start > timeout)

}} catch (interruptedexception e) finally

return true;

}

isdone() 會判斷反饋結果了的任務數量跟總數量是否一致,如果一致就說明全部執行完成了。

public boolean isdone()
那麼任務執行完了怎麼反饋呢? 不可能讓每個使用的方法去關心,所以我們定義了乙個切面來做這件事情。

@aspect

@component

public class eventlisteneraspect catch (exception e) finally }}

通過 defaultfuture.received() 反饋執行結果。

public static void received(string id, boolean result) 

// 累加執行完成任務數量

future.feedbackresultcount.incrementandget();

if (future.isdone())

}}private void doreceived()

} finally

}

下面我們來總結整個流程:

需要注意的是每個 eventlistener 內部消費的邏輯都要做冪等控制。

事件驅動之非同步事件

public void head private void raiseevent headedevent headedevent 所以我們只需在 裡raiseevent就可以了。其實很簡單,因為我們要實現的是同步的事件,我們只需要找到所有處理這個事件的實現類,然後呼叫所有就可以了。public in...

Springboot event 非同步事件

springboot 啟動非同步事件監聽機制 springboot事件監聽 spring中使用 async註解使even監聽事件之間的執行變為非同步 springboot之非同步事件 spring event的事件驅動模型的最佳實踐 eventlistener springboot實現觀察者模式 發...

事件 委託 非同步同步

1 理解事情和委託是怎麼來的,為什麼要有委託,委託結合事情的魅力所在 2 ui執行緒中怎麼正確呼叫同步委託或非同步委託 invoke和begininvoke的區別是什麼 control.invoke和delegate.invoke有什麼區別?事件和委託原理的理解解析 invoke和begininvo...