ZeroMQ通訊案例

2021-10-12 12:51:44 字數 3514 閱讀 1382

使用zeromq實現c++與python之間的通訊功能

(1)傳輸的輸入包括字串和資料

(2)使用發布訂閱模型(sub和pub)以及中介軟體proxy實現雙向通訊

(3)手動指定埠號(待完善)

建立發布端pub

# 建立context、驗證context是否成功

void* context_pub = zmq_ctx_new();

assert(context_pub != null);

# 建立socket埠 及 驗證

void* pubsocket = zmq_socket(context_pub, zmq_pub);

assert(pubsocket != null);

# 繫結本地ip位址,serverip是本地伺服器ip位址

string str1_ip = "tcp://"+serverip+":8887";

int ret = zmq_connect(pubsocket,str1_ip.c_str());

assert(ret == 0);

建立訂閱端sub

void* context_sub = zmq_ctx_new();

assert(context_sub != null);

void* sub_c1 = zmq_socket(context_sub, zmq_sub);

assert(sub_c1 != null);

// 此處的localhost要替換成伺服器端的ip

string str2_ip = "tcp://" + serverip + ":8080";

ret = zmq_connect(sub_c1, str2_ip.c_str());

assert(ret == 0);

發布端傳送資料

# 傳送字串(zmq_sndmore引數表示後續還有資料需要傳送,用於一次傳送多幀資料)

string channel = "127.0.0.1";

int ret = zmq_send(pubsocket, channel.c_str(), channel_size, zmq_sndmore);

# 傳送

ret = zmq_send(pubsocket, data_encode.data(), data_encode.size() + 1, 0);

接收端接收資料

zmq_msg_t recv_msg;

int size_t;

string ip = "";

zmq_msg_init(&recv_msg);

zmq_msg_recv(&recv_msg, sub_c1, 0);

size_t = zmq_msg_size(&recv_msg);

for (int i = 0; i < size_t; i++)

ip.push_back(((char*)zmq_msg_data(&recv_msg))[i]);

關閉套接字

zmq_close(pubsocket);

zmq_close(sub_c1);

zmq_ctx_destroy(context_pub);

zmq_ctx_destroy(context_sub);

中介軟體proxy,可以自動將訊息從「下游」傳遞到「上游」,用來實現基於「訂閱-發布」模式下的雙向傳輸。

中介軟體實際上相當於一管道,每一段可以繫結乙個socket,在「下游」的socket收到的任何資訊,都會自動傳遞到「上游socket」中,而上下游socket的型別對於中介軟體的傳輸沒有影響。本示例中,下游socket型別是xsub,與xsub連線的是所有客戶端上的pub,且支援新的pub接入,以此實現新客戶端接入功能,並且在客戶端接入時,會先傳送一次自己的ip位址給伺服器端;上游socket型別是xpub,與之連線的是伺服器端的sub。

當有新的伺服器端pub_new建立並接入xsub時,其ip位址資訊的傳遞路徑是:client_pub_new→xsub→spub→server_sub,然後伺服器端會將新的ip位址加入list中,以此實現客戶端接入的動態檢測過程。

# 建立xpub型別的socket,並且繫結本地8888埠

context = zmq.context(

)socket_xpub = context.socket(zmq.xpub)

socket_xpub.bind(

"tcp//*:8888*"

)# 設定xpub_socket的屬性,允許所有訂閱端訊息傳輸到上游

socket_xpub.setsockeopt(zmq.xpub_verbose,1)

# 建立xsub型別的socket,繫結本地8887埠

context = zmq.context(

) socket_xsub = context.socket(zmq.xsub)

socket_xsub.bind(

"tcp://*:8887"

)# 使用中介軟體proxy將xpub和xsub繫結

zmq.proxy(socket_xsub, socket_xpub)

注:中介軟體proxy是乙個持續執行的函式,需要單獨開闢console或者為其單獨開闢執行緒來執行。

訂閱端

# 建立sub型別socket

context = zmq.context(

)socket_sub = context.socket(zmq.sub)

# 繫結本地8888埠,即與proxy的下游相連

xpub_port =

8888

socket_sub.connect(

"tcp://localhost:%d"

% xpub_port)

# 設定socket,不過濾任何資訊

socket_sub.setsockopt(zmq.subscribe,b""

)

發布端

# 建立pub型別的socket

context = zmq.context(

)socket_pub = context.socket(zmq.pub)

# 繫結本地8080埠

pub_port =

8080

socket_pub.bind(

"tcp://*:%d"

% pub_port)

Socket通訊案例

socket通訊案例 服務端 region 服務端 int port 1234 string host 127.0.0.1 ipaddress ip ipaddress.parse host ipendpoint ipe new ipendpoint ip,port socket ssocket n...

訊息通訊庫ZeroMQ 4 0 4安裝指南

一 zeromq介紹 zeromq是乙個開源的訊息佇列系統,按照官方的定義,它是乙個訊息通訊庫,幫助開發者設計分布式和並行的應用程式。首先,我們需要明白,zeromq不是傳統的訊息佇列系統 比如activemq webspheremq rabbitmq等 zeromq可以幫助我們建立自己的訊息佇列系...

訊息通訊庫ZeroMQ 4 0 4安裝指南

一 zeromq介紹 zeromq是乙個開源的訊息佇列系統,按照官方的定義,它是乙個訊息通訊庫,幫助開發者設計分布式和並行的應用程式。首先,我們需要明白,zeromq不是傳統的訊息佇列系統 比如activemq webspheremq rabbitmq等 zeromq可以幫助我們建立自己的訊息佇列系...