RabbitMQ的工作佇列和路由

2021-09-06 14:19:21 字數 4182 閱讀 6151

工作佇列:working queue

工作佇列這個概念與簡單的傳送/接收訊息的區別就是:接收方接收到訊息後,可能需要花費更長的時間來處理訊息,這個過程就叫乙個work/task。

幾個概念

分配:多個接收端接收同乙個queue時,如何分配?

訊息確認:server端如何確定接收方的work已經對訊息進行了完整的處理?

訊息持久化:傳送方、服務端queue如何對未處理的訊息進行磁碟持久化?

round-robin分配

多個接收端接收同乙個queue時,採用了round-robin分配演算法,即輪叫排程——依次分配給各個接收方。

訊息確認

對於耗時的work,可以先關閉自動訊息確認,在work完成後,再手動發回確認。

channel.basicconsume("hello",false/*關閉自動訊息確認*/,consumer);

// ...work完成後

channel.basicack(delivery.getenvelope().getdeliverytag(), false);

持久化

1. server端的queue持久化

注意的是,如果已經宣告了同名非持久化的queue,則再次宣告無效。

傳送方和接收方都需要指定該引數。

boolean durable = true;

channel.queuedeclare("task_queue", durable, false, false, null); 

2. message持久化

channel.basicpublish("", "task_queue", messageproperties.

persistent_text_plain,message.getbytes());

負載分配

為了解決各個接收端工作量相差太大的問題(有的一直busy,有的空閒比較多),突破round-robin。

int prefetchcount = 1;

channel.basicqos(prefetchcount);

意思為,最多為當前接收方傳送一條訊息。如果接收方還未處理完畢訊息,還沒有回發確認,就不要再給他分配訊息了,應該把當前訊息分配給其它空閒接收方。

場景示例:訊息傳送方傳送了型別為[error][info]的兩種訊息,寫磁碟的訊息接受者只接受error型別的訊息,console列印的接收兩者。

(上圖採用了不同顏色來作為routingkey)

傳送方

connectionfactory factory = new

connectionfactory();

factory.sethost("localhost");

connection connection =factory.newconnection();

channel channel =connection.createchannel();

channel.exchangedeclare(exchange_name, "direct"/*

exchange型別為direct

*/);

channel.basicpublish(exchange_name, "info"/*

*/, null

, message.getbytes());

channel.close();

connection.close();

接收方

connectionfactory factory = new

connectionfactory();

factory.sethost("localhost");

connection connection =factory.newconnection();

channel channel =connection.createchannel();

channel.exchangedeclare(exchange_name, "direct"/*

exchange型別為direct

*/);

//建立匿名queue

string queuename =channel.queuedeclare().getqueue();

"error");

channel.queuebind(quuename,exchange_name,"info");

queueingconsumer consumer = new

queueingconsumer(channel);

channel.basicconsume(queuename,

true

, consumer);

queueingconsumer.delivery delivery = consumer.nextdelivery(); //

blocking...

string message = new

string(delivery.getbody());

string routingkey = delivery.getenvelope().getroutingkey(); //

* 表示乙個word;

# 表示0個或者多個word;

傳送方

connectionfactory factory = new

connectionfactory();

factory.sethost("localhost");

connection connection =factory.newconnection();

channel channel =connection.createchannel();

channel.exchangedeclare(exchange_name, "topic"/*

exchange型別

*/);

channel.basicpublish(exchange_name, "***.yyy"/*

*/, null

, message.getbytes());

channel.close();

connection.close();

接收方

connectionfactory factory = new

connectionfactory();

factory.sethost("localhost");

connection connection =factory.newconnection();

channel channel =connection.createchannel();

channel.exchangedeclare(exchange_name, "topic"/*

exchange型別

*/);

//建立匿名queue

string queuename =channel.queuedeclare().getqueue();

"*.yyy");

queueingconsumer consumer = new

queueingconsumer(channel);

channel.basicconsume(queuename,

true

, consumer);

queueingconsumer.delivery delivery = consumer.nextdelivery(); //

blocking...

string message = new

string(delivery.getbody());

string routingkey = delivery.getenvelope().getroutingkey(); //

refs

rabbitMQ工作佇列

簡介 傳送耗時的任務給多個工作者,直到任務完成,返回給mq資訊,mq刪除佇列中的訊息。如果沒有收到返回資訊,就斷掉了,mq重新傳送該條資訊 data implode array slice argv,1 if empty data data hello world msg new amqpmessa...

RabbitMQ 工作佇列

rabbitmq是訊息 它接收資訊和 資訊。你可以把他考慮成乙個郵局。當你講郵寄的信放在郵局時,你可以確定郵差先生或者女士會把郵件最終送到你的收件人手中。當然郵局和rabbitmq最大的區別,rabbitmq不接受紙張,它只接收,儲存,二進位制的資料訊息快。下面講一些rabbitmq中的術語 注意 ...

RabbitMQ工作佇列

工作佇列也叫任務佇列,主要思想就是避免立即執行資源密集型任務,必須等待完成,才能繼續下乙個任務,你可以執行多個工人,佇列裡的工作他們可以共同不重複的完成。1 佇列優點之一就是能夠輕鬆平行的工作。如果積壓工作,我們可以增加更多的工人。預設情況下,rabbitmq將按順序將每條訊息傳送給下乙個工作者。平...