zmq pub sub通訊之ipc雙向主題

2021-10-03 07:08:49 字數 2758 閱讀 8710

zmq pub-sub, push-pull模式沒有客服端服務端啟動先後順序的限制,與普通的socket通訊不一樣,必須先啟動服務端。

以下是測試程式,pub.py為服務端,sub.py客戶端。

pub.py

# coding: utf-8

import zmq

import time

import threading

import os

import stat

# 分類後的日誌的zmq的pub位址

log_type_pub_path = "ipc:///tmp/log_types.ipc"

# simulator 日誌的zmq的sub位址

log_sub_path = "ipc:///tmp/log_lator.ipc"

topic_list = ["lator", "att"]

def unlink_ipc(path):

index = path.rfind('ipc://')

if index < 0:

return

fpath = path[len('ipc://'):]

#if os.path.exists(fpath):

os.unlink(fpath)

def pub(pubaddr, topic):

context = zmq.context()

sock = context.socket(zmq.pub)

sock.set_hwm(100)

#unlink_ipc(pubaddr)

sock.bind(pubaddr)

counter = 1

os.chmod(pubaddr[len('ipc://'):], stat.s_irwxo + stat.s_irwxg + stat.s_irwxu)

zpath = sock.getsockopt(zmq.last_endpoint)

print zpath

while true:

messagedata = "this is msg fro topic one %s" % counter

print "%s %s" % (topic, messagedata)

sock.send("%s %s" % (topic, messagedata))

counter = counter + 1

time.sleep(1)

if __name__ == "__main__":

t1 = threading.thread(target=pub, args=(log_type_pub_path, "lator"))

t2 = threading.thread(target=pub, args=(log_sub_path, "att"))

t1.start()

t2.start()

t1.join()

t2.join()

sub.py

# coding: utf-8

import os

import zmq

from zmq.eventloop.ioloop import ioloop

from zmq.eventloop.zmqstream import zmqstream

# 分類後的日誌的zmq的pub位址

log_type_pub_path = "ipc:///tmp/log_types.ipc"

# simulator 日誌的zmq的sub位址

log_sub_path = "ipc:///tmp/log_lator.ipc"

topic_list = ["lator", "att"]

def unlink_ipc(path):

index = path.rfind('ipc://')

if index < 0:

return

fpath = path[len('ipc://'):]

if os.path.exists(fpath):

os.unlink(fpath)

def recv_func(msg):

print msg

def main2():

loop_instance = ioloop.instance()

ctx = zmq.context.instance()

sock = ctx.socket(zmq.sub)

sock.set_hwm(100)

sock.connect(log_type_pub_path)

sock.connect(log_sub_path)

for key in topic_list:

if isinstance(key, str):

sock.setsockopt(zmq.subscribe, key)

elif isinstance(key, unicode):

sock.setsockopt_string(zmq.subscribe, key)

else:

print("log_broker to set subscribe error:%s" % key)

sock = zmqstream(sock, loop_instance)

sock.on_recv(recv_func)

loop_instance.start()

if __name__ == "__main__":

main2()

ipc通訊之管道

首先 一 無名管道pipe 1,沒有名字的 2,半雙工 讀寫不能同時進行 3,通過直系親屬訪問繼承 4,管道缺省會阻塞 5,不能用lseek定位 6,操作沒有原子性 示例 include include include include include include include void sig...

程序間通訊之 IPC

有三種稱做xsi ipc的ipc 訊息佇列 message queues 訊號量 semaphores 以及共享記憶體 shared memory 每個核心中的ipc結構 訊息佇列,訊號量和共享儲存段 都用乙個非負整數的識別符號來加以引用。要向乙個佇列中傳送訊息或讀取訊息只需要知道其佇列識別符號即可...

IPC通訊之共享記憶體

共享記憶體就是使得多個程序可以訪問同一塊記憶體空間,是最快的可用 ipc形式。是針對其他通訊機制執行效率較低而設計的。往往與其它通訊機制,如訊號量結合使用,來達到程序間的同步及互斥。我們通過一張圖來表示這個關係 共享記憶體和訊息佇列,訊號量一樣都屬於xsi ipc。核心都為他們維護了一套資料結構 同...