rabbitmq 理論 主題交換機

2021-08-19 18:48:43 字數 4036 閱讀 2899

儘管直連交換機能夠改善我們的系統,但是它也有它的限制 —— 沒辦法基於多個標準執行路由操作。

在我們的日誌系統中,我們不只希望訂閱基於嚴重程度的日誌,同時還希望訂閱基於傳送**的日誌。unix工具syslog就是同時基於嚴重程度-severity (info/warn/crit...) 和 裝置-facility (auth/cron/kern...)來路由日誌的。

如果這樣的話,將會給予我們非常大的靈活性,我們既可以監聽**於「cron」的嚴重程度為「critical errors」的日誌,也可以監聽**於「kern」的所有日誌。

為了實現這個目的,接下來我們學習如何使用另一種更複雜的交換機 —— 主題交換機。

傳送到主題交換機(topic exchange)的訊息不可以攜帶隨意什麼樣子的路由鍵(routing_key),它的路由鍵必須是乙個由.分隔開的詞語列表。這些單詞隨便是什麼都可以,但是最好是跟攜帶它們的訊息有關係的詞彙。以下是幾個推薦的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。詞語的個數可以隨意,但是不要超過255位元組。

繫結鍵也必須擁有同樣的格式。主題交換機背後的邏輯跟直連交換機很相似 —— 乙個攜帶著特定路由鍵的訊息會被主題交換機投遞給繫結鍵與之想匹配的佇列。但是它的繫結鍵和路由鍵有兩個特殊應用方式:

下邊用圖說明:

這個例子裡,我們傳送的所有訊息都是用來描述小動物的。傳送的訊息所攜帶的路由鍵是由三個單詞所組成的,這三個單詞被兩個.分割開。路由鍵裡的第乙個單詞描述的是動物的手腳的利索程度,第二個單詞是動物的顏色,第三個是動物的種類。所以它看起來是這樣的:..

我們建立了三個繫結:q1的繫結鍵為*.orange.*,q2的繫結鍵為*.*.rabbitlazy.#

這三個繫結鍵被可以總結為:

乙個攜帶有quick.orange.rabbit的訊息將會被分別投遞給這兩個佇列。攜帶著lazy.orange.elephant的訊息同樣也會給兩個佇列都投遞過去。另一方面攜帶有quick.orange.fox的訊息會投遞給第乙個佇列,攜帶有lazy.brown.fox的訊息會投遞給第二個佇列。攜帶有lazy.pink.rabbit的訊息只會被投遞給第二個佇列一次,即使它同時匹配第二個佇列的兩個繫結。攜帶著quick.brown.fox的訊息不會投遞給任何乙個佇列。

如果我們違反約定,傳送了乙個攜帶有乙個單詞或者四個單詞("orange"or"quick.orange.male.rabbit")的訊息時,傳送的訊息不會投遞給任何乙個佇列,而且會丟失掉。

但是另一方面,即使"lazy.orange.male.rabbit"有四個單詞,他還是會匹配最後乙個繫結,並且被投遞到第二個佇列中。

主題交換機

主題交換機是很強大的,它可以表現出跟其他交換機類似的行為

當乙個佇列的繫結鍵為 "#"(井號) 的時候,這個佇列將會無視訊息的路由鍵,接收所有的訊息。

*(星號) 和#(井號) 這兩個特殊字元都未在繫結鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。

接下來我們會將主題交換機應用到我們的日誌系統中。在開始工作前,我們假設日誌的路由鍵由兩個單詞組成,路由鍵看起來是這樣的:.emit_log_topic.py的**:

#!/usr/bin/env python

import pika

import sys

connection = pika.blockingconnection(pika.connectionparameters(

host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',

type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1

else

'anonymous.info'

message = ' '.join(sys.argv[2:]) or

'hello world!'

channel.basic_publish(exchange='topic_logs',

routing_key=routing_key,

body=message)

print

" [x] sent %r:%r" % (routing_key, message)

connection.close()

receive_logs_topic.py的**:

#!/usr/bin/env python

import pika

import sys

connection = pika.blockingconnection(pika.connectionparameters(

host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',

type='topic')

result = channel.queue_declare(exclusive=true)

queue_name = result.method.queue

binding_keys = sys.argv[1:]

ifnot binding_keys:

print >> sys.stderr, "usage: %s [binding_key]..." % (sys.argv[0],)

sys.exit(1)

for binding_key in binding_keys:

channel.queue_bind(exchange='topic_logs',

queue=queue_name,

routing_key=binding_key)

print

' [*] waiting for logs. to exit press ctrl+c'

defcallback

(ch, method, properties, body):

print

" [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,

queue=queue_name,

no_ack=true)

channel.start_consuming()

執行下邊命令 接收所有日誌:

python receive_logs_topic.py "#"

執行下邊命令 接收來自」kern「裝置的日誌:

python receive_logs_topic.py "kern.*"

執行下邊命令 只接收嚴重程度為」critical「的日誌:

python receive_logs_topic.py "*.critical"

執行下邊命令 建立多個繫結:

python receive_logs_topic.py "kern.*" "*.critical"

執行下邊命令 傳送路由鍵為 "kern.critical" 的日誌:

python emit_log_topic.py "kern.critical" "a critical kernel error"

執行上邊命令試試看效果吧。另外,上邊**不會對路由鍵和繫結鍵做任何假設,所以你可以在命令中使用超過兩個路由鍵引數。

rabbitmq學習5 主題交換機

require once dir vendor autoload.php use phpamqplib connection amqpstreamconnection use phpamqplib message amqpmessage connection new amqpstreamconnec...

RabbitMQ 交換機模式

在說正題之前先解釋一下交換機模式是個籠統的稱呼,它不是乙個單獨的模式 包括了訂閱模式,路由模式和主題模式 交換機模式是乙個比較常用的模式,主要是為了實現資料的同步。首先,說一下訂閱模式,就和字面上的意思差不多主要就是乙個生產者,多個消費者,同乙個訊息被多個消費者獲取,先看一下官網的圖示 整體執行過程...

RabbitMQ之Exchange交換機

rabbitmq中的exchange的作用 訊息佇列,訊息通過傳送和exchange之後最終到達的地方,到達queue的訊息及進入了等待消費的狀態。每個訊息都會被傳送到乙個或多個佇列。佇列的常用屬性name 佇列的名稱 durability 是否需要持久化,true為持久化 auto delete ...