訊息佇列的常見模式

2021-09-25 14:02:09 字數 3983 閱讀 7411

push即服務端主動傳送資料給客戶端。在服務端收到訊息之後立即推送給客戶端。

當 producer 發出的訊息到達後,服務端馬上將這條訊息投遞給 consumer。

客戶端連線到broker之後,啟動乙個執行緒,這個執行緒的任務就是迴圈呼叫方法從broker中拉取相應的訊息至本地。如果是非同步方法呼叫,則直接呼叫***方法,間接呼叫業務消費訊息的方法,而不使用本地記憶體進行訊息的快取;所以這裡的非同步只是客戶端的非同步,而非broker的主動推送。通過這種方式既能解決多客戶端的連線,也能解決類似服務端的push型的訊息推送。在網際網路中這種實現才具有普便性,因為這種方式即解決了效能問題又解決了非同步訊息的需求。

push模型最大的好處就是實時性。因為服務端可以做到只要有訊息就立即推送,所以訊息的消費沒有「額外」的延遲。

pull模式由consumer主動從broker獲取訊息。

當服務端收到這條訊息後什麼也不做,只是等著 consumer 主動到自己這裡來讀,即 consumer 這裡有乙個「拉取」的動作。

客戶端(指乙個connection,一般情況指乙個tcp的連線建立)連線到broker之後,啟動乙個執行緒,這個執行緒的任務就是迴圈呼叫方法從broker中拉取相應的訊息至本地。如果是同步方法呼叫獲取則將相應的訊息存放在本地記憶體中,當同步方法消費訊息時,則從該記憶體區中直接獲取相應的訊息進行消費;

這樣帶來了一些好處:

當 producer 速率大於 consumer 速率時,有兩種可能性:一種是 producer 本身的效率就要比 consumer 高(例如,consumer 端處理訊息的業務邏輯可能很複雜,或者涉及到磁碟、網路等 i/o 操作);另一種是 consumer 出現故障,導致短時間內無法消費或消費不暢。

push 方式由於無法得知當前 consumer 的狀態,所以只要有資料產生,便會不斷地進行推送,在以上兩種情況下時,可能會導致 consumer 的負載進一步加重,甚至是崩潰(例如生產者是 flume 瘋狂抓日誌,消費者是 hdfs+hadoop,處理效率跟不上)。除非consumer 有合適的反饋機制能夠讓服務端知道自己的狀況。

而採取 pull 的方式問題就簡單了許多,由於 consumer 是主動到服務端拉取資料,此時只需要降低自己訪問頻率即可。舉例:如前端是 flume 等日誌收集業務,不斷向 cmq 生產訊息,cmq 向後端投遞,後端業務如資料分析等業務,效率可能低於生產者。

採用 push 的方式時,一旦訊息到達,服務端即可馬上將其推送給消費端,這種方式的實時性顯然是非常好的;而採用 pull 方式時,為了不給服務端造成壓力(尤其是當資料量不足時,不停的輪詢顯得毫無意義),需要控制好自己輪詢的間隔時間,但這必然會給實時性帶來一定的影響。

pull 模式存在的問題:由於主動權在消費方,消費方無法準確地決定何時去拉取最新的訊息。如果一次 pull 取到訊息了還可以繼續去 pull,如果沒有 pull 取到訊息則需要等待一段時間再重新 pull。

由於等待時間很難判定。您可能有 xx 動態拉取時間調整演算法,但問題的本質在於,是否有訊息到來不是由消費方決定。也許1分鐘內連續到來1000條訊息,接下來的半個小時卻沒有新訊息產生,可能您的演算法算出下次最有可能到來的時間點是31分鐘之後,或者60分鐘之後,結果下條訊息10分鐘後到達。

當然,延遲也有對應的解決方案,業界較成熟的做法是從短時間開始(不會對 cmq broker 有太大負擔),然後指數級增長等待。例如開始等5ms,然後10ms,然後20ms,然後40ms,以此類推,直到有訊息到來,然後再回到5ms。即使這樣,依然存在延遲問題:假設40ms到80ms之間的50ms訊息到來,訊息就延遲了30ms,對於半個小時來一次的訊息,這些開銷就是白白浪費的。

cmq 提供了長輪詢的優化方法,用以平衡 pull/push 模型各自的缺點。基本方式是:消費者如果嘗試拉取失敗,不是直接 return,而是把連線掛在那裡 wait,服務端如果有新的訊息到來,把連線拉起,返回最新訊息

首先,在 consumer 偶然宕機或下線時,producer 的生產是可以不受影響的,consumer 上線後,可以繼續之前的消費,此時訊息資料不會丟失;但是如果 consumer 長期宕機或是由於機器故障無法再次啟動,就會出現問題,即服務端是否需要為 consumer 保留資料,以及保留多久的資料等等。

採用 push 方式時,因為無法預知 consumer 的宕機或下線是短暫的還是持久的,如果一直為該 consumer 保留自宕機開始的所有歷史訊息,那麼即便其他所有的 consumer 都已經消費完成,資料也無法清理掉,隨著時間的積累,佇列的長度會越來越大,此時無論訊息是暫存於記憶體還是持久化到磁碟上(採用 push 模型的系統,一般都是將訊息佇列維護於記憶體中,以保證推送的效能和實時性,這一點會在後邊詳細討論),都將對 cmq 服務端造成巨大壓力,甚至可能影響到其他 consumer 的正常消費,尤其當訊息的生產速率非常快時更是如此;但是如果不保留資料,那麼等該 consumer 再次起來時,則要面對丟失資料的問題。

折中的方案是:cmq 給資料設定乙個超時時間,當 consumer 宕機時間超過這個閾值時,則清理資料;但這個時間閾值也並不容易確定。

在採用 pull 模型時,情況會有所改善;服務端不再關心 consumer 的狀態,而是採取「你來了我才服務」的方式,consumer 是否能夠及時消費資料,服務端不會做任何保證(也有超時清理時間)。

「在broker一直有可讀訊息的情況下,long-polling就等價於執行間隔為0的pull模式(每次收到pull結果就發起下一次pull請求)。」

這是long-polling在服務端一直有可消費訊息的處理情況。在這個情況下,一條訊息如果在long-polling請求返回時到達服務端,那麼它被consumer消費到的延遲是:

假設broker和consumer之間的一次網路開銷時間為r毫秒,

那麼這條訊息需要經歷3r才能到達consumer

第乙個r:訊息已經到達broker,但是long-polling請求已經讀完資料準備返回consumer,從broker到consumer消耗了r

第二個r:consumer收到了broker的響應,發起下一次long-polling,這個請求到達broker需要乙個r

的時間第三個r:broker收到請求讀取了這條資料,那麼返回到consumer需要乙個r的時間

所以總共需要3r(不考慮讀取的開銷,只考慮網路開銷)

另外,在這種情況下broker和consumer之間一直在進行請求和響應(long-polling變成了間隔為0的pull)。

考慮這樣一種方式,它有long-polling的優勢,同時能減少在有訊息可讀的情況下由broker主動push訊息給consumer,減少不必要的請求

在訊息中介軟體的consumer中會有乙個buffer來快取從broker獲取的訊息,而使用者的消費執行緒從這個buffer中獲取消費來訊息,獲取訊息的執行緒和消費執行緒通過這個buffer進行資料傳遞。

有這個buffer的存在,是否可以在long-polling請求時將buffer剩餘空間告知給broker,由broker負責推送資料。此時broker知道最多可以推送多少條資料,那麼就可以控制推送行為,不至於沖垮consumer。

最後來了解一下廣播,先看一下廣播的概念:

廣播訊息是指生產者產生的訊息將分發給所有訂閱這個訊息的消費者,而普通的模式是:一批訊息可以被多個人共同消費,如consumer1可能消費1,3,5記錄,而consumer2可能消費的是2,4,6這種模組就是共同消費模組;而今天說的是廣播訊息,它是指一些訊息同時被推送到多個訂閱者,而這些訂閱者收到的訊息都是完整的,如consumer1收到的會是1,2,3,4,5,6,而consumer2回到的也會是1,2,3,4,5,6,這種就像廣播一樣,把訊息廣播給多人!

當然只有在訊息到達伺服器之前訂閱的消費者才能收到,未訂閱的消費組是無法收到訊息的

訊息佇列的模式

一.簡介 訊息佇列包括兩種模式,點對點模式 point to point queue 和發布 訂閱模式 publish subscribe,topic 二.點對點模式 點對點模式包括三個角色 訊息佇列 傳送者 生產者 接收者 消費者 訊息傳送者生產訊息傳送到queue中,然後訊息接收者從queue中...

訊息佇列屬性及常見訊息佇列介紹

訊息佇列是在訊息的傳輸過程中儲存訊息的容器,用於接收訊息並以檔案的方式儲存,乙個佇列的訊息可以同時被多個訊息消費者消費。分布式訊息服務dms則是分布式的佇列系統,訊息佇列中的訊息分布儲存,且每條訊息儲存多個副本,以實現高可用性,如下圖所示。一般來說,訊息佇列具有如下屬性 訊息順序 普通佇列支援 分割...

MQ訊息佇列的常見用法

訊息佇列mq是分布式中重要的元件 目前常見的訊息佇列有三種 activemq,rabbitmq,kafka 今天我想來梳理一下mq訊息佇列的具體常見用法 1.非同步處理 使用者註冊之後,需要發簡訊和加積分,註冊資訊寫入資料庫後,通過非同步訊息,讓簡訊服務和積分服務去做它們的事,就提公升了 的質量 2...