RabbitMQ實戰 理解訊息通訊

2022-04-11 13:40:58 字數 3446 閱讀 8103

rabbitmq是乙個開源的訊息**和佇列伺服器,可以通過基本協議在完全不同的應用之間共享資料,可以將作業排隊以便讓分布式服務進行處理。

本篇介紹下訊息通訊,首先介紹基礎概念,將這些概念對映到amqp協議,然後介紹訊息持久化、傳送方確認模式等訊息可靠性保證。

通過本篇介紹,你會了解到:

訊息通訊概念

此部分的介紹,會牽涉到amqp的元素,如果之前沒接觸過的,可以結合下面的「amqp元素」進行理解。

訊息訊息是傳輸的主體,訊息包括兩部分:有效載荷(payload)和標籤(label);有效載荷是要傳輸的資料,可以是任何內容,比如json串、二進位制、自定義的資料協議等;標籤描述了有效載荷,並且rabbit用它來決定誰將獲得訊息的投遞。

可以與http協議模擬,http訊息頭部描述了訊息體的型別、大小等,http訊息體是要傳輸的資料,http服務端通過訊息頭部決定如何處理請求和資料。

生產者和消費者

生產者建立訊息,然後傳送到**伺服器(rabbitmq server),amqp只會用標籤表述這條訊息(乙個交換器名稱和可選的主題標記),rabbit伺服器會根據標籤把訊息傳送給訂閱的消費者。

消費者消費訊息,它會訂閱到佇列(queue)上,每當有訊息到達rabbitmq伺服器時,會傳送給消費者,消費者收到訊息時,會進行處理。

注意:消費者收到的訊息只包括有效載荷,所有不會知道是從**發來的。

連線和通道

要想發布或消費訊息,必須先與rabbitmq server建立一條tcp連線,建立tcp連線之後,要建立一條通道,通道是建立在真實tcp連線的虛擬連線。

amqp命令都是通過通道傳送出去的,每條通道會被指派乙個唯一的id,為什麼不直接通過tcp連線傳送amqp命令呢? 因為作業系統建立和銷毀tcp會話是很昂貴的,而且建立的連線數也有限。 通過引入通道,可以在連線上建立通道,而且通道是私密的,相互不受影響。

通道的概念還是有點抽象,後面專門寫一篇文章進行分析介紹,這裡簡單理解下吧。

amqp元素

amqp訊息路由有三部分組成:佇列、交換器和繫結,佇列是存放訊息的地方,交換器是決定不同的分發策略,繫結是佇列和交換器的橋梁,定義匹配規則。

生產者傳送訊息到交換器,交換器根據自身型別和繫結規則,將訊息存放在對應佇列中,然後將訊息傳送到監聽佇列的消費者。

如上圖:p為生產者,x為交換器,交換器型別為direct,根據不同的繫結規則(orange、black、green),分發給不同的佇列,c為消費者,從不同的佇列介紹訊息。

佇列消費者通過兩種方式從特定的佇列接收訊息:

basic.get會影響效能,推薦使用basic.consume來實現高吞吐量,因為其處理過程是先訂閱訊息,獲取單條訊息,再取取消訂閱。

如果佇列擁有多個消費者時,佇列收到的訊息將以迴圈的方式發給消費者,即多個消費者平均消費這些訊息。

另外,消費者接收到的每一條訊息都要進行確認,必須通過basic.ack命令向rabbitmq服務端傳送乙個確認。 也可以設定auto_ack為true,只要消費者接收到訊息,就自動視為確認,不過不建議這樣,因為接收到不代表業務邏輯處理成功。 服務端接收到確認後,會從佇列中刪除對應訊息。

還有一種場景,在接收到訊息後,如果不想處理,可以通過下面方式處理:

最後來介紹下如何建立佇列,首先明確下是生成者還是消費者建立,關鍵點是:生產者能否承擔起丟失訊息,因為發出去的訊息如果路由到了不存在的佇列,rabbit會忽略它們。所以,建議生成者和消費者都嘗試去建立佇列,可以通過設定queue.declare的passive選項設定為ture來判斷佇列是否存在,如果不存在會返回乙個錯誤。

通過queue.declare命令來建立佇列,有一些選項說明下:

queuedeclare(string queue, 

boolean durable,

boolean exclusive,

maparguments);

交換器和繫結

交換器有四種型別:direct、fanout、topic、headers,其中headers匹配訊息的header而非路由鍵,不太實用,就不詳細介紹了。

第一種:direct交換器

direct交換器比較簡單,如果和路由鍵完全匹配的話,就會投遞到對應的佇列:

channel.exchangedeclare(exchange_name, "direct");

channel.queuebind(queuename, exchange_name, routingkey);

伺服器預設包含乙個空白字串名稱的預設路由器,當宣告乙個佇列時,會自定繫結到預設交換器,並以佇列名稱作為路由鍵。

第二種:fanout交換器

fanout交換器,不處理路由鍵,只需要簡單的將佇列繫結到交換機上,為會每個消費者自動生成乙個隨機佇列,所有的消費者都會收到所有訊息。

channel.exchangedeclare(exchange_name, "fanout");
第三種:topic交換器

topic交換器,將路由鍵和某模式進行匹配,此時佇列需要繫結要乙個模式上。

channel.exchangedeclare(exchange_name, "topic");

channel.queuebind(queuename, exchange_name, "*.orange.*");

關於模式,符號#匹配乙個或多個詞,符號匹配乙個詞,因此kfs.#能夠匹配到kfs.session.message,但是audit.只會匹配到audit.session。

虛擬主機

每個rabbitmq伺服器都能建立虛擬訊息伺服器,稱為虛擬主機(vhost),每個rabbitmq本質上是乙個mini版的rabbitmq伺服器,擁有自己的佇列、交換器、繫結,還有自己的許可權機制。

連線時,必須制定vhost,rabbitmq包含了預設的vhost:」/」。當建立乙個使用者時,會被指派給至少乙個vhsot,並且相互隔離。

vhost不能通過amqp協議建立,需要使用rabbitmqctl工具建立。

訊息持久化和傳送方確認模式

如果沒有持久化,重啟rabbitmq後,佇列、交換器都會消失,rabbitmq提供了持久化的功能,需要滿足以下三個條件:

當發布一條持久化訊息到持久化交換器上時,rabbit會在訊息提交到日誌檔案後才會傳送響應,所有會損失效能,所以,只對重要資料持久化即可。

考慮這種情況:由於發布訊息後,不返回任何資訊給生產者,如何只對伺服器已經持久化到硬碟了呢,可能在傳輸過程中丟失,或者持久化前伺服器宕機,導致訊息丟失。

rabbitmq通過「傳送方確認模式」來解決上面的問題。首先,需要將通道設定成confirm模式,這樣所有在通道上發布的訊息都會被指派乙個唯一的id號,一旦訊息被投遞到所有匹配的佇列或持久化到磁碟,會傳送乙個確認訊息給生產者。

通過本篇的介紹,對rabbit的訊息模型有了整體了解

RabbitMQ實戰 理解訊息通訊

前段時間總結完了 深入淺出mybatis 系列,對mybatis有了更全面和深入的了解,在掘金社群也收到了一些博友的喜歡,很高興。另外,短暫的陪產假就要結束了,小寶也二周了,下周二就要投入工作了,希望自己盡快調整過來,加油努力。從本篇開始總結 rabbitmq實戰 系列的閱讀筆記,rabbitmq是...

RabbitMQ實戰 訊息確認機制之訊息的正確消費

上節中我們講了如何確保訊息的準確發布,今天我們來看看如何確保訊息的正確消費。在之前的基礎上我們對消費者 倉庫服務 進行完善。所以,首先我們將ack的方式設定為手動 spring rabbitmq host xx port 5672 username x password x listener dir...

訊息中介軟體RabbitMQ 實戰一

1.安裝erlang環境 apt get install erlang yum install erlang 2.安裝rabbitmq apt get install rabbitmq server 3.建立使用者 自己編譯rabbitmq server下的操作 cd opt rabbitmq se...