RabbitMQ 測試問題

2021-06-18 17:33:16 字數 4726 閱讀 1498

使用eventlet併發consumer指令碼:

eventlet.monkey_patch(all=true)

msg_per_queue = 50

queue_num = 10

rabbit_host = '10.23.54.150:5672'

class consumer():

def __init__(self, count):

self.queue_name = 'test_queue%d' % count

self.i=0

self.a=none

self.b=none

server =

self.conn = amqp.connection( server['host'],

userid=server['userid'],

password=server['password'],

ssl=server['ssl'])

self.ch = self.conn.channel()

self.ch.access_request('/data', active=true, read=true)

self.ch.exchange_declare(exchange='rabbit_test_concurrent',

type='topic', durable=true,

auto_delete=false)

self.ch.queue_declare(queue=self.queue_name, durable=true, 

exclusive=false, auto_delete=false)

self.ch.queue_bind(queue=self.queue_name, 

exchange='rabbit_test_concurrent',

routing_key=self.queue_name)

def showmsg(self, msg):

if self.i == 0:

self.a = time.time()

self.i = self.i + 1

msg.channel.basic_ack(msg.delivery_tag)

if self.i == msg_per_queue:

self.b =time.time()

msg.channel.basic_cancel(msg.consumer_tag)

if msg.body == 'quit':

msg.channel.basic_cancel(msg.consumer_tag)

def __call__(self):

self.ch.basic_consume(self.queue_name, callback=self.showmsg)

try:

while self.ch.callbacks:

self.ch.wait()

self.ch.close()

self.conn.close()

except keyboardinterrupt, e:

print 'hello'

raise e

if __name__ == '__main__':

consumers =

pool = eventlet.greenpool()

try:

for i in range(1, queue_num+1):

consumer = consumer(i)

pool.spawn_n(consumer)

pool.waitall()

except keyboardinterrupt :

msg_sum = sum(consumer.i for consumer in consumers)

print 'the sum of received  

messages is %d' % msg_sum

exit()

a = min(consumer.a for consumer in consumers)

b = max(consumer.b for consumer in consumers)

if a and b:

print 'the time used is %f' % (b-a)

執行出現下面問題:

traceback (most recent call last):

file "consumer_concurrent_eventlet.py", line 68, in

consumer = consumer(i)

file "consumer_concurrent_eventlet.py", line 27, in __init__

ssl=server['ssl'])

file "/usr/lib/python2.6/site-packages/amqplib-1.0.2-py2.6.egg/amqplib/client_0_8/connection.py", line 138, in __init__

self._x_start_ok(d, login_method, login_response, locale)

file "/usr/lib/python2.6/site-packages/amqplib-1.0.2-py2.6.egg/amqplib/client_0_8/connection.py", line 719, in _x_start_ok

self._send_method((10, 11), args)

file "/usr/lib/python2.6/site-packages/amqplib-1.0.2-py2.6.egg/amqplib/client_0_8/abstract_channel.py", line 76, in _send_method

method_sig, args, content)

file "/usr/lib/python2.6/site-packages/amqplib-1.0.2-py2.6.egg/amqplib/client_0_8/method_framing.py", line 252, in write_method

self.dest.write_frame(1, channel, payload)

file "/usr/lib/python2.6/site-packages/amqplib-1.0.2-py2.6.egg/amqplib/client_0_8/transport.py", line 165, in write_frame

frame_type, channel, size, payload, 0xce))

file "/usr/lib/python2.6/site-packages/eventlet-0.13.0-py2.6.egg/eventlet/greenio.py", line 307, in sendall

tail = self.send(data, flags)

file "/usr/lib/python2.6/site-packages/eventlet-0.13.0-py2.6.egg/eventlet/greenio.py", line 293, in send

total_sent += fd.send(data[total_sent:], flags)

socket.error: [errno 104] connection reset by peer

將**進行如下修改,貌似可解決問題:

if __name__ == '__main__':

consumers =

pool = eventlet.greenpool()

try:

for i in range(1, queue_num+1):

consumer = consumer(i)

pool.spawn_n(consumer)

pool.waitall()

except keyboardinterrupt :

msg_sum = sum(consumer.i for consumer in consumers)

print 'the sum of received  

messages is %d' % msg_sum

exit()

改為如下:

if __name__ == '__main__':

consumers =

pool = eventlet.greenpool()

try:

for i in range(1, queue_num+1):

consumer = consumer(i)

print '%d connection is open' % i

for consumer in consumers:

pool.spawn_n(consumer)

pool.waitall()

except keyboardinterrupt :

msg_sum = sum(consumer.i for consumer in consumers)

print 'the sum of received  

messages is %d' % msg_sum

exit()

當發現問題時,第一反映是伺服器端可能出現了tcp連線數限制問題,覺得自己指令碼正常。通過列印標誌得到原來好多鏈結並沒有connection成功。

RabbitMQ的面試問題

為什麼使用訊息佇列啊?訊息佇列都有什麼優點和缺點?有點有上面說的解耦,非同步,削峰 缺點呢?因為插入了mq這個訊息中介軟體,如果mq訊息中介軟體掛掉了,那麼像上面說的解耦的情況,服務a的訊息給bcdef等伺服器就傳送不出去了,如果mq沒有掛掉bc都從a中獲取了訊息,修改了資料庫的資料,但是d服務的監...

RabbitMQ常見的面試問題

rabitmq是什麼?rabbitmq是一款使用erlang語言開發,基於amqp協議的訊息中介軟體.rabbitmq的應用場景是什麼?1.非同步通知 發簡訊,郵件的時候,採用非同步處理的方式,客戶無需等待通知結果 2.流量削鋒 在電商中大秒殺活動中,採用佇列長度來控制請求的數量,超過佇列的長度,則...

壓力測試問題

環境 兩台虛機配置,千m網絡卡 a 8cpu,32g記憶體 應用伺服器 b 2cpu,8g記憶體 資料庫 壓力測試資料 200使用者,每秒響應160 170次左右,cpu占用10 左右,記憶體占用穩定,始終無法提公升,網路使用率4 資料庫伺服器,cpu 2 記憶體占用穩定 資料庫一切正常,伺服器cp...