RabbitMQ六種佇列模式 工作佇列模式

2022-06-05 21:06:09 字數 2910 閱讀 3274

rabbitmq六種佇列模式-簡單佇列

rabbitmq六種佇列模式-工作佇列[本文]

rabbitmq六種佇列模式-發布訂閱

rabbitmq六種佇列模式-路由模式

rabbitmq六種佇列模式-主題模式

上文我們了解了 rabbitmq 六種佇列模式中的簡單佇列,**也是非常的簡單,比較容易理解。

但是簡單佇列有個缺點,簡單佇列是一一對應的關係,即點對點,乙個生產者對應乙個消費者,按照這個邏輯,如果我們有一些比較耗時的任務,也就意味著需要大量的時間才能處理完畢,顯然簡單佇列模式並不能滿足我們的工作需求,我們今天再來看看工作佇列。

1. 什麼是工作佇列

2. **部分

2.1 生產者

2.2 消費者

3. 迴圈分發

3.1 啟動生產者

3.2 啟動兩個消費者

3.3 公平分發

4. 訊息持久化

4.1 問題背景

4.2 引數配置

5. 工作佇列總結

工作佇列:用來將耗時的任務分發給多個消費者(工作者)

主要解決問題:處理資源密集型任務,並且還要等他完成。有了工作佇列,我們就可以將具體的工作放到後面去做,將工作封裝為乙個訊息,傳送到佇列中,乙個工作程序就可以取出訊息並完成工作。如果啟動了多個工作程序,那麼工作就可以在多個程序間共享。

工作佇列也稱為公平性佇列模式,怎麼個說法呢?

迴圈分發,假如我們擁有兩個消費者,預設情況下,rabbitmq 將按順序將每條訊息傳送給下乙個消費者,平均而言,每個消費者將獲得相同數量的訊息,這種分發訊息的方式稱為輪詢。

看**吧。

2.1 生產者

建立50個訊息

public

class

producer2 

channel.close();

newconnection.close();}}

2.2 消費者

public

class

customer2_1  catch (exception e)  finally }};

/** 3.監聽佇列 */

channel.basicconsume(queue_name, false, defaultconsumer);}}

3.1 啟動生產者

3.2 啟動兩個消費者

在生產者中我們傳送了50條訊息進入佇列,而上方消費者啟**裡很明顯的看到輪詢的效果,就是每個消費者會分到相同的佇列任務。

3.3 公平分發

由於上方模擬的是非常簡單的訊息佇列的消費,假如有一些非常耗時的任務,某個消費者在緩慢地進行處理,而另乙個消費者則空閒,顯然是非常消耗資源的。

再舉乙個例子,乙個1年的程式設計師,跟乙個3年的程式設計師,分配相同的任務量,明顯3年的程式設計師處理起來更加得心應手,很快就無所事事了,但是3年的程式設計師拿著非常高的薪資!顯然3年的程式設計師應該承擔更多的責任,那怎麼辦呢?

公平分發。

其實發生上述問題的原因是 rabbitmq 收到訊息後就立即分發出去,而沒有確認各個工作者未返回確認的訊息數量,類似於tcp/udp中的udp,面向無連線。

因此我們可以使用 basicqos 方法,並將引數 prefetchcount 設為1,告訴 rabbitmq 我每次值處理一條訊息,你要等我處理完了再分給我下乙個。這樣 rabbitmq 就不會輪流分發了,而是尋找空閒的工作者進行分發。

關鍵性**:

/** 2.獲取通道 */

final channel channel = newconnection.createchannel();

channel.queuedeclare(queue_name, false, false, false, null);

/** 保證一次只分發一次 限制傳送給同乙個消費者 不得超過一條訊息 */

channel.basicqos(1);

4.1 問題背景

上邊我們提到的公平分發是由消費者收取訊息時確認解決的,但是這裡面又會出現被 kill 的情況。

當有多個消費者同時收取訊息,且每個消費者在接收訊息的同時,還要處理其它的事情,且會消耗很長的時間。在此過程中可能會出現一些意外,比如訊息接收到一半的時候,乙個消費者死掉了。

這種情況要使用訊息接收確認機制,可以執行上次宕機的消費者沒有完成的事情。

但是在預設情況下,我們程式建立的訊息佇列以及存放在佇列裡面的訊息,都是非持久化的。當rabbitmq死掉了或者重啟了,上次建立的佇列、訊息都不會儲存。

怎麼辦呢?

4.2 引數配置

引數配置一:生產者建立佇列宣告時,修改第二個引數為 true

/**3.建立佇列宣告 */

channel.queuedeclare(queue_name, true, false, false, null);

引數配置二:生產者傳送訊息時,修改第三個引數為messageproperties.persistent_text_plain

for (int i = 1; i <= 50; i++) 

1、迴圈分發:消費者端在通道上開啟訊息應答機制,並確保能返回接收訊息的確認資訊,這樣可以保證消費者發生故障也不會丟失訊息。

2、訊息持久化:伺服器端和客戶端都要指定佇列的持久化和訊息的持久化,這樣可以保證rabbitmq重啟,佇列和訊息也不會丟失。

3、公平分發:指定消費者接收的訊息個數,避免出現訊息均勻推送出現的資源不合理利用的問題。

案例**:

RabbitMq六種使用模式 3 訂閱發布模式

之前的例子都是講訊息直接傳送給指定的佇列 現在需要傳送給多個佇列,訂閱 發布模式 生產者不再將訊息直接發往佇列,而是發往exchange,然後exchange再傳送給在該exchange訂閱的佇列 exchange需要知道如何對接收到的訊息進行 exchange有四種型別 direct,topic,...

六種單列模式

確保某乙個類只有乙個例項.1 餓漢式單列 public class singleone 構造方法私有 禁止建立 private singleone 2 懶漢式單列 public class singletwo 同步方法 每次呼叫都同步 消耗資源 public static synchronized ...

jBPM的tasknode的六種模式

jbpm的tasknode的六種模式 last 預設值。當最後乙個任務完成時 signal 當 task node 沒有建立任務時,直接 signal。last wait 當最後乙個任務完成時 signal 當 task node 沒有建立任務時等待。first 當第乙個任務完成時 signal 當...