訊息佇列RabbitMQ入門介紹

2021-06-21 06:26:44 字數 3650 閱讀 4194

(一)基本概念

rabbitmq是流行的開源訊息佇列系統,用erlang語言開發。我曾經對這門語言挺有興趣,學過一段時間,後來沒堅持。rabbitmq是amqp(高階訊息佇列協議)的標準實現。如果不熟悉amqp,直接看rabbitmq的文件會比較困難。不過它也只有幾個關鍵概念,這裡簡單介紹。

rabbitmq的結構圖如下:

幾個概念說明:

broker:簡單來說就是訊息佇列伺服器實體。

exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。

queue:訊息佇列載體,每個訊息都會被投入到乙個或多個佇列。

binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。

routing key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。

vhost:虛擬主機,乙個broker裡可以開設多個vhost,用作不同使用者的許可權分離。

producer:訊息生產者,就是投遞訊息的程式。

consumer:訊息消費者,就是接受訊息的程式。

channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表乙個會話任務。

訊息佇列的使用過程大概如下:

(1)客戶端連線到訊息佇列伺服器,開啟乙個channel。

(2)客戶端宣告乙個exchange,並設定相關屬性。

(3)客戶端宣告乙個queue,並設定相關屬性。

(4)客戶端使用routing key,在exchange和queue之間建立好繫結關係。

(5)客戶端投遞訊息到exchange。

exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到乙個或多個佇列裡。

exchange也有幾個型別,完全根據key進行投遞的叫做direct交換機,例如,繫結時設定了routing key為」abc」,那麼客戶端提交的訊息,只有設定了key為」abc」的才會投遞到佇列。對key進行模式匹配後進行投遞的叫做topic交換機,符號」#」匹配乙個或多個詞,符號」*」匹配正好乙個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.*」只匹配」abc.def」。還有一種不需要key的,叫做fanout交換機,它採取廣播模式,乙個訊息進來時,投遞到與該交換機繫結的所有佇列。

rabbitmq支援訊息的持久化,也就是資料寫在磁碟上,為了資料安全考慮,我想大多數使用者都會選擇持久化。訊息佇列持久化包括3個部分:

(1)exchange持久化,在宣告時指定durable => 1

(2)queue持久化,在宣告時指定durable => 1

(3)訊息持久化,在投遞時指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。如果exchange和queue兩者之間有乙個持久化,乙個非持久化,就不允許建立繫結。

(二)應用實際

我使用linux伺服器(ubuntu 9.10 64位),安裝rabbitmq非常方便。

先執行如下命令安裝erlang:

apt-get install erlang-nox

dpkg -i rabbitmq-server_2.6.1-1_all.deb

安裝完後,使用

/etc/init.d/rabbitmq-server start|stop|restart

來啟動、停止、重啟rabbitmq。

在正式應用之前,我們先在rabbitmq裡建立乙個vhost,加乙個使用者,並設定該使用者的許可權。

使用rabbitmqctl客戶端工具,在根目錄下建立」/pyhtest」這個vhost:

rabbitmqctl add_vhost /pyhtest

建立乙個使用者名稱」pyh」,設定密碼」pyh1234″:

rabbitmqctl add_user pyh pyh1234

設定pyh使用者對/pyhtest這個vhost擁有全部許可權:

rabbitmqctl set_permissions -p /pyhtest pyh 「.*」 「.*」 「.*」

後面三個」*」代表pyh使用者擁有對/pyhtest的配置、寫、讀全部許可權

設定好後,開始程式設計,我用perl寫乙個訊息投遞程式(producer):

#!/usr/bin/perl

use strict;

use net::rabbitmq;

use uuid::tiny;

my $channel = 1000; # channel id,可以隨意指定,只要不衝突

my $queuename = 「pyh_queue」; # 佇列名

my $exchange = 「pyh_exchange」; # 交換機名

my $routing_key = 「test」; # routing key

my $mq = net::rabbitmq->new(); # 建立乙個rabbitmq物件

$mq->connect(「localhost」, ); # 建立連線

$mq->channel_open($channel); # 開啟乙個channel

$mq->exchange_declare($channel, $exchange, ); # 宣告乙個持久化的交換機

$mq->queue_declare($channel, $queuename, ); # 宣告乙個持久化的佇列

$mq->queue_bind($channel, $queuename, $exchange, $routing_key); # 使用routing key在交換機和佇列間建立繫結

for (my $i=0;$i<10000000;$i++) , ); # 將訊息結合key以持久化模式投遞到交換機

}$mq->disconnect(); # 斷開連線

訊息接受程式(consumer)大概如下:

#!/usr/bin/perl

use strict;

use net::rabbitmq;

my $channel = 1001;

my $queuename = 「pyh_queue」;

my $mq = net::rabbitmq->new();

$mq->connect(「localhost」, );

$mq->channel_open($channel);

while (1) , 「: 「, $hashref->,」\n」;

}$mq->disconnect();

consumer連線後只要指定佇列就可獲取到訊息。

上述程式共投遞1000萬條訊息,每條訊息36位元組(uuid),開啟持久化,共耗時17分多鐘(包括產生uuid的時間),每秒投遞訊息約9500條。測試機器是8g記憶體、8核志強cpu。

投遞完後,在/var/lib/rabbitmq/mnesia/rabbit@$/msg_store_persistent目錄,產生2g多的持久化訊息資料。在執行consumer程式後,這些資料都會消失,因為訊息已經被消費了。

RabbitMQ訊息佇列(一) 入門

1.架構簡介 rabbitmq主要分為服務端和客戶端兩大塊。服務端主要由服務節點 broker 構成,服務節點主要包含交換器 exchange 佇列 queue 客戶端向服務端傳送和接收資料,分為生產者 producer 消費者 consumer 架構圖 資料流程圖 2.訊息 訊息一般包含2個部分 ...

訊息佇列Rabbitmq

rabbitmq server rabbitmqctl reset rabbitmqctl stop rabbitmqctl stop rabbitmqctl list users rabbitmqctl list queues rabbitmqctl add user user name user...

訊息佇列RabbitMQ

這是乙個很嚴肅的問題。系統之間解除耦合,可以讓不同語言編寫的系統通訊互動 保證伺服器負載不會飆公升。高大上一點就是流量削峰。讓程式變成非同步,提高響應速度。把費時任務放到另乙個程序或執行緒去執行。redis實現 剛開始學習redis時,一看這個鍊錶不就是給佇列準備的嗎?所以,一心扎進去,要寫個佇列出...