Python通過RabbitMQ實現RPC

2022-04-18 10:21:21 字數 3059 閱讀 8083

#

!/usr/bin/env python

#-*- coding:utf-8 -*-

import

pika

import

uuid

import

time

class

fibonaccirpcclient(object):

def__init__

(self):

#生成socket

self.connection = pika.blockingconnection(pika.connectionparameters('

localhost'))

#生成管道

self.channel =self.connection.channel()

#宣告乙個隨機queue,exclusive=true會在此queue的消費者斷開後,自動將queue刪除

result = self.channel.queue_declare(exclusive=true)

#獲取隨機queue名

self.callback_queue =result.method.queue

#定義收到訊息後的動作

self.channel.basic_consume(self.on_response, #

**函式on_response

no_ack=true,

queue=self.callback_queue) #

獲取隨機queue名

defon_response(self, ch, method, props, body):

if self.corr_id == props.correlation_id: #

判斷uuid是否是否一致

self.response = body #

佇列返回

defcall(self, n):

self.response =none

self.corr_id = str(uuid.uuid4()) #

生成uuid,等會傳送給服務端

#傳送訊息給服務端

self.channel.basic_publish(exchange=''

, routing_key='

rpc_queue

', #

路由鍵 properties=pika.basicproperties(reply_to=self.callback_queue, #

告訴服務端將返回發到哪個佇列

correlation_id=self.corr_id),

body=str(n)) #

傳送的訊息

while self.response is

none:

self.connection.process_data_events()

#非阻塞版的start_consuming(),如果收到訊息就執行on_response**函式

print("

no msg....")

time.sleep(0.5) #

這裡可以執行其他命令

return int(self.response) #

返回結果

#生成例項

fibonacci_rpc =fibonaccirpcclient()

print("

[x] requesting fib(30)")

#呼叫call函式

response = fibonacci_rpc.call(30)

print("

[x] got %r

" % response)

server端**:

#

!/usr/bin/env python

#-*- coding:utf-8 -*-

import

pika

import

time

#生成socket

connection = pika.blockingconnection(pika.connectionparameters('

localhost'))

#生成管道

channel =connection.channel()

#宣告乙個queue防止啟動報錯

channel.queue_declare(queue='

rpc_queue')

deffib(n):

if n ==0:

return

0

elif n == 1:

return 1

else

:

return fib(n - 1) + fib(n - 2)

defon_request(ch, method, props, body):

n =int(body)

print("

[.] fib(%s)

" %n)

response =fib(n)

ch.basic_publish(exchange=''

, routing_key=props.reply_to,

properties=pika.basicproperties(correlation_id=props.correlation_id),

body=str(response))

ch.basic_ack(delivery_tag=method.delivery_tag) #

回覆確認訊息

#處理完這條再發下一條

channel.basic_qos(prefetch_count=1)

#定義收到訊息動作

channel.basic_consume(on_request,queue='

rpc_queue')

channel.start_consuming()

erlang jcl遠端除錯 rabbitmq

job control mode jcl in which jobs can be started,stopped,detached or connected.only the current job can communicate with the shell.通過jcl,我們可以遠端接入乙個er...

mac 使用docker 安裝 rabbitmq

專案要用到rabbitmq 官網說brew install 但是brew update就半天都不成功,突然想起了docker,以前用過一點點,就試著來了一下 docker search rabbitmq management docker pull rabbitmq management docke...

RabbitMQ實戰 什麼是RabbitMQ

mq 首先我們說下mq,mq全稱為message queue,即訊息佇列,是一種應用程式對應用程式的通訊方法。其特點就是一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。訊息 message brokers 從發布者 publishers 亦稱生產者 producers 那兒接受...