RabbitMQ學習筆記(三) 發布與訂閱

2022-01-19 19:33:25 字數 2883 閱讀 9884

在我們使用手機傳送訊息的時候,即可以選擇給單個手機號碼傳送訊息,也可以選擇多個手機號碼,**訊息。

前面學習工作佇列的時候,我們使用的場景是乙個訊息只能被乙個消費者程式例項接收並處理,但是如果想要**訊息,僅憑之前學到的東西是實現不了的。

所以這裡需要引入rabbitmq的發布與訂閱模式。

rabbitmq通訊模型的核心思想是訊息生產者不會直接傳送訊息到訊息佇列,生產者程式也不知道他產生的訊息是否傳送到了乙個訊息佇列中。

實際上訊息生產者只會傳送訊息到乙個exchange物件。exchange是英語中交換的意思,這rabbitmq中它實際就是乙個訊息的中轉站,一方面它會接收所有與它繫結的生產者程式例項產生的訊息,一方面它有會根據自身的定義將訊息傳送到不同的訊息佇列中。不同的exchange型別決定了它會將訊息傳送到乙個特定的訊息佇列,還是多個訊息佇列。

exchange有四種型別

rabbitmq的發布和訂閱模式會使用fanout型別的exchange。

「fanout」在英語中是扇出、展開的意思,這種型別的exchange會將收到的一條訊息,傳送到多個訊息佇列中,而不同的訊息消費者例項監聽不同的訊息佇列,從而實現訊息廣播的功能。

我們可以使用channel物件中的exchangedeclare方法來宣告exchange

channel.exchangedeclare("logs", "fanout");

這個方法的第乙個引數是exchange的名稱,第二個引數是exchange的型別。

當我們呼叫channel物件的basicpublish方法傳送訊息的時候,我們可以設定exchange引數

public static void basicpublish(this imodel model, string exchange, string routingkey, ibasicproperties basicproperties, byte body);
例:

channel.basicpublish(exchange: "logs",

routingkey: "",

basicproperties: null,

body: body);

回想上一章工作佇列的例子,會發現之前才發布訊息的時候,我們有傳遞乙個空 exchange

channel.basicpublish(exchange: "",

routingkey: "work_queue",

basicproperties: properties,

body: body

);

那為什麼訊息會正確的傳送到訊息消費者程式例項呢?

這個其實是rabbitmq的乙個預設設定,當傳送訊息時,如果將exchange設定為空字串,系統會自動啟用乙個預設的exchange, 這個exchange會把訊息自動新增到第二個引數routeingkey指定的訊息佇列中(在上一章的例子中,即work_queue訊息佇列),所以訊息消費者程式例項才能正確的接收訊息。

當我們**訊息的時候,我們需要為每個訊息消費者例項建立乙個名字獨特的訊息佇列。

但是這裡我們只是關注**訊息,而不想去關注為訊息佇列起名字這件事情。

rabbitmq提供了一種臨時訊息佇列,來幫助我們簡化這個操作。

當我們在宣告訊息佇列的時候,不傳遞任何引數,生成的就是乙個臨時訊息佇列

channel.queuedeclare();
而如果想獲得這個訊息佇列的名字,我們可以從它的queuename屬性中獲取。

var queuename = channel.queuedeclare().queuename;
這個queuename是rabbitmq幫我們動態隨機生成,例:amq.gen-jzty20brgko-hjmujj0wlg.

當我們建立了乙個 exchange之後,我們需要設定exchange給那些訊息佇列傳送訊息,exchange和訊息佇列之間的關係叫做binding(繫結)

在channel物件中有乙個queuebind的方法來幫助我們完成這一操作

channel.queuebind(queue: queuename,

exchange: "logs",

routingkey: "");

現在我們修改一下我們之前的傳送訊息程式,將它改造成乙個訊息**器。

宣告乙個名為broadcast的exchange

修改傳送訊息部分的**,現在直接傳送到建立的exchange中

static void main(string args)

;using (var connection = factory.createconnection())

", message);}}

}

使用臨時訊息佇列

將exchange和臨時訊息佇列繫結

static void main(string args)

;using (var connection = factory.createconnection())

", message);

};channel.basicconsume(queue: queuename, autoack: true, consumer: consumer);

console.read();}}

}

我們啟動1個send程式,2個receive程式。

RabbitMQ(三) 發布訂閱

rabbitmq 三 發布訂閱 一 概述 rabbitmq的發布訂閱 publish subscribe 其將生產者和消費者進一步解耦,生產者生產訊息後,交付給交換機,消費者上線後,主動主動去佇列中取資料進行處理。該模式也符合上一節工作佇列中的ack 預取等規則。發布訂閱模式如下圖所示 二 交換機 ...

RabbitMQ學習系列 三 發布 訂閱

編寫生產者 編寫消費者 有幾個概念介紹一下 1 生產者 生產者是傳送訊息的使用者的應用程式2 路由 處理生產者訊息發到哪個佇列3 佇列 佇列是儲存訊息的緩衝器4 消費者 消費者是接收訊息的使用者的應用程式 exchangedeclare string exchange,string type,boo...

golang使用rabbitmq(三)發布 訂閱

rabbitmq訊息傳遞模型的核心是生產者永遠不會把訊息直接傳送到某個佇列上,甚至根本不知道訊息是否傳送到佇列上。生產者只向交換器傳送訊息。交換器來決定怎麼處理訊息,是把訊息傳遞到特定的佇列,還是把訊息傳到多個佇列,或者直接丟棄訊息,這一切取決於交換器的型別。交換器的型別有四種 direct,top...