架構 訊息中介軟體 基於記憶體的mq

2022-03-29 05:01:25 字數 1650 閱讀 7015

如果使用者的請求比較費時,可以考慮將使用者的請求資訊放到佇列中,立即返回給使用者處理中等資訊,這樣可以給使用者比較流暢的體驗,後端可以利用單獨的服務消費訊息,做到了解耦,提高了併發能力。

本文使用jdk為我們提供的阻塞佇列api,來實現乙個基於記憶體的簡單訊息佇列。主要涉及的介面blockingqueue,以及它的實現類arrayblockingqueue(陣列實現的)和linkedblockingqueue(鍊錶實現的)。

blockingqueue的主要方法

新增元素

put() //往佇列裡插入元素,如果佇列已經滿,則會一直等待直到隊列為空插入新元素,或者執行緒被中斷丟擲異常;

offer() //往佇列新增元素如果佇列已滿直接返回false,佇列未滿則直接插入並返回true;

add() //對offer()方法的簡單封裝.如果佇列已滿,丟擲異常new illegalstateexception("queue full");

刪除元素

remove() //方法直接刪除隊頭的元素;

take() //取出並刪除隊頭的元素,當隊列為空,則會一直等待直到佇列有新元素可以取出,或者執行緒被中斷丟擲異常;

pool() //取出並刪除隊頭的元素,當隊列為空,返回null;

peek() //直接取出隊頭的元素,並不刪除;

element() //對peek方法進行簡單封裝,如果隊頭元素存在則取出並不刪除,如果不存在丟擲異常nosuchelementexception();

基於記憶體的佇列,佇列的大小依賴於jvm記憶體的大小,一般如果是記憶體占用不大且處理相對較為及時的都可以採用此種方法。如果你在佇列處理的時候需要有失敗重試機制,那麼用此種佇列就不是特別合適了,可以使用基於資料庫的mq。

使用示例,簡單模仿基於生產者、消費者、訊息佇列模型的記憶體mq

*生產者模型

public class produce 

public produce(int id, blockingqueuequeue)

public void produce(string message)

}}

*消費者模型

public class consumer 

public consumer(blockingqueuequeue, int id)

public void consumer() catch (interruptedexception e)

}}, 0, 5, timeunit.seconds);

}}

*測試類

public class test  catch (interruptedexception e) 

for(int i=0;i<20;i++)

}}

ps:訊息佇列無論在分布式還是在高併發概念中,都是乙個非常重要的資料處理方式,本文只是我對訊息中介軟體這個技術棧的第一次試水,後面會陸續增加基於資料庫的mq,以及現在已經比較成熟商業中介軟體產品rabbitmq、rocketmq等的學習筆記。

訊息中介軟體 MQ

1 為什麼需要訊息佇列mq 因為在高併發環境下,由於來不及同步處理,請求往往會發生阻塞,比如 大量的insert,update語句請求同時到達mysql,直接導致無數的行鎖鎖表,甚至最後的請求會堆積過多,從而觸發too many connections錯誤。通過使用訊息佇列,可以非同步的處理請求,從...

MQ訊息中介軟體

mq是message queue,就是訊息佇列。是進行通訊的中介軟體產品,可以把訊息佇列比作是乙個存放訊息的容器,呼叫的方法就是訊息,把方法存到佇列中然後從佇列中取出方法去執行。目前使用較多的訊息佇列有activemq,rabbitmq,kafka,rocketmq。訊息佇列的作用有非同步 削峰 解...

訊息中介軟體MQ

訊息中介軟體利用高效可靠的訊息傳遞機制進行平台無關的資料交流,並基於資料通訊來進行分布式系統的整合。通過提供訊息傳遞和訊息排隊模型,它可以在分布式環境下擴充套件程序間的通訊。對於訊息中介軟體,常見的角色大致也就有producer 生產者 consumer 消費者 訊息佇列中介軟體是分布式系統中重要的...