zmq是基於tcp實現的嗎 zmq模組的理解和使用

2021-10-16 23:13:46 字數 4782 閱讀 6227

最近專案中接觸到zeromq, 內部實現挺複雜的,沒時間深入了解,簡單記錄下使用方法吧,有時間會來填坑。 官方指導文件

專案主要用zeromq在多個ip主機上的服務間進行專案通訊,直接用scoket也可以實現,但比較費時費力,zeromq建立在socket的基礎上,提供了一套更加簡單強大的api,可以快速搭建起跨程序,跨ip等的通訊網路。很多文章中都提到了socket只能實現一對一的通訊,zeromq可以實現多對多的連線,而且有三種模式供選擇,可以根據業務需要,進行選擇和使用。

zeromq的三種通訊模式分別是:request-reply,  publisher-subscriber,  parallel pipeline

python安裝zmq模組:pip install pyzmq

1. request-reply(應答模式)

應答模式特點:

1. 客戶端提出請求,服務端必須回答請求,每個請求只回答一次

2.  客戶端沒有收到答覆前,不能再次進行請求

3. 可以有多個客戶端提出請求,服務端能保證各個客戶端只接收到自己的答覆

4. 如果服務端斷掉或者客戶端斷掉會產生怎樣的影響?

如果是客戶端斷掉,對服務端沒有任何影響,如果客戶端隨後又重新啟動,那麼兩方繼續一問一答,但是如果是服務端斷掉了,就可能會產生一些問題,這要看服務端是在什麼情況下斷掉的,如果服務端收是在回答完問題後斷掉的,那麼沒影響,重啟服務端後,雙發繼續一問一答,但如果服務端是在收到問題後斷掉了,還沒來得及回答問題,這就有問題了,那個提問的客戶端遲遲得不到答案,就會一直等待答案,因此不會再傳送新的提問,服務端重啟後,客戶端遲遲不發問題,所以也就一直等待提問。

python 實現客戶端和服務端**如下:

zmq_server.py

import zmq

context=zmq.context() #建立上下文

socket=context.socket(zmq.rep) #建立response服務端socket

socket.bind("tcp://*:5555") #socket繫結,*表示本機ip,埠號為5555,採用tcp協議通訊whiletrue:

message=socket.recv()

print(type(message)) #接收到的訊息也會bytes型別(位元組)

print("收到訊息:{}".format(message))

socket.send(b"new message") #傳送訊息,位元組碼訊息

zmq_client.py

#coding:utf-8import zmq

context=zmq.context()

socket=context.socket(zmq.req)

socket.connect("tcp://localhost:5555")

socket.send(b"a message")

response=socket.recv()

print(response)

常用資料傳送api如下:

#傳送資料

socket.send_json(data) #data 會被json序列化後進行傳輸 (json.dumps)

socket.send_string(data, encoding="utf-8") #data為unicode字串,會進行編碼成子節再傳輸

socket.send_pyobj(obj) #obj為python物件,採用pickle進行序列化後傳輸

socket.send_multipart(msg_parts) # msg_parts, 傳送多條訊息組成的迭代器序列,每條訊息是子節型別,

# 如[b"message1", b"message2", b"message2"]

#接收資料

socket.recv_json()

socket.recv_string()

socket.recv_pyobj()

socket.recv_multipart()

2. publisher-subscriber (發布-訂閱模式)

publiser廣播訊息到所有客戶端,客戶端根據訂閱主題過濾訊息

python實現**如下, 其中publisher發布兩條訊息,第一條訊息的topic為client1, 被第乙個subscriber接收到;第二條訊息的topic為client2, 被第二個subscriber接收到。

注意的是subscriber在匹配時,並不是完全匹配的,訊息的topic為client1開頭的字串都會被匹配到,如果topic為"client1cient2", 也會被第乙個subscriber接收到

zmq_server.py

#coding:utf-8import zmq

context=zmq.context()

socket=context.socket(zmq.pub)

socket.bind("tcp://*:5555")

topic= ["client1", "client2"]whiletrue:for t intopic:

data= "message for {}".format(t)

msg= [t.encode("utf-8"), data.encode("utf-8")] #列表中的第一項作為訊息的topic,sub根據topic過濾訊息

print(msg)

socket.send_multipart(msg)

zmq_client1.py

#coding:utf-8import zmq

context=zmq.context()

socket=context.socket(zmq.sub)

socket.subscribe("client1") #訂閱主題topic為:client1

socket.connect("tcp://localhost:5555")

msg=socket.recv_multipart()

print(msg)

結果:zmq_client2.py

import zmq

context = zmq.context()

socket = context.socket(zmq.sub)

socket.subscribe("client2") #訂閱主題topic為:client2

socket.connect("tcp://localhost:5555")

msg = socket.recv_multipart()

print(msg)

結果:3. parallel pipeline(並行管道模式)

管道模式有三部分組成,如下圖所示,最左邊的producer通過push產生任務, 中間的consumer接收任務處理後**,最後result collector接收所有任務的結果。 相比於publisher-subscriber,多了乙個資料快取和處理負載的部分,當連線斷開,資料不會丟失,重連後資料繼續傳送到客戶端。

python實現producer, consumer, resultcollector

producer.py

import zmq

context=zmq.context()

socket=context.socket(zmq.push)

socket.bind("tcp://*:5577")for num in range(2000):

work_message=

socket.send_json(work_message)

consumer.py

import random

import zmq

context=zmq.context()

consumer_id= random.randint(1, 1000)

#接收工作

consumer_receiver=context.socket(zmq.pull)

consumer_receiver.connect("tcp://localhost:5577")

#**結果

consumer_sender=context.socket(zmq.push)

consumer_sender.bind("tcp://*:5578")whiletrue:

msg=consumer_receiver.recv_json()

data= msg["num"]

result=

consumer_sender.send_json(result)

resultcollector.py

#coding:utf-8import zmq

context=zmq.context()

result_receiver=context.socket(zmq.pull)

result_receiver.connect("tcp://localhost:5578")

result=result_receiver.recv_json()

collecter_data={}for x in range(1000):if result['consumer_id'] incollecter_data:

collecter_data[result['consumer_id']] = collecter_data[result['consumer_id']] + 1

else:

collecter_data[result['consumer_id']] = 1

if x == 999:

print(collecter_data)

執行順序:

python producer.py

python consumer.py

python resultcollector.py

程式設計實現基於tcp的socket程式設計

server端 public class server socket.shutdowninput 關閉輸入流 4 獲取輸出流,響應客戶端的請求 outputstream os socket.getoutputstream printwriter pw new printwriter os 包裝為列印...

用C 實現基於用C 實現基於TCP協議的網路通訊

tcp 協議是乙個基本的網路 協議,基本上所有的網路服務都是基於 tcp協議的,如http,ftp等等,所以要了解網路程式設計就必須了解基於 tcp協議的程式設計。然而 tcp協議是乙個龐雜的體系,要徹底的弄清楚它的實現不是一天兩天的功夫,所幸的是在.net framework環境下,我們不必要去追...

TCP是怎麼實現可靠傳輸的

tcp協議傳輸的特點主要是面向位元組流 傳輸可靠 面向連線。答 tcp協議保證資料傳輸可靠性的方式主要有 序列號 tcp傳輸時將每個位元組的資料都進行了編號,即序列號。確認應答 tcp傳輸過程中,每次接收方收到資料後,都會對傳輸方進行確認應答。也就是傳送ack報文。這個ack報文當中帶有對應的確認序...