python使用kafka收發訊息

2021-10-14 14:53:27 字數 2844 閱讀 9993

kafka是最近幾年很流行的訊息佇列中介軟體。在大資料以及後端服務領域有很廣泛的應用。廢話不多說,接下來直接上**介紹python如何向kafka傳送資料以及訂閱資料。

kafka的訊息是 " 發布--訂閱" 模式的。 接下來先介紹向kakfa發布訊息。先安裝python的kafka連線模組。pip install kafka-python.

import time

from kafka import kafkaproducer

class kafkamsgproducer:

def __init__(self, server):

self._server = server

self.producer = none

def connect(self):

if self.producer is none:

producer = kafkaproducer(bootstrap_servers=self._server)

self.producer = producer

def close(self):

if self.producer is not none:

self.producer.close()

self.producer = none

def send(self, topic, msg):

if self.producer is not none:

if not isinstance(msg, bytes):

msg = msg.encode("utf-8") # 將str型別轉換為bytes型別

self.producer.send(topic=topic, value=msg)

def run():

producer = kafkamsgproducer("localhost:9092")

producer.connect() # 建立連線

topic = "yanchampion-test"

print("start sending msg to kafka!")

for msg in "hello! this is yanchampion speaking!".split():

producer.send(topic=topic, msg=msg) # 向kafka 指定topic傳送資料

time.sleep(1)

if __name__ == '__main__':

run() # 執行發布訊息程式

以上**即可以向kafka指定topic發布訊息了。注意,為了測試,先不執行producer.py

import time

from kafka import kafkaproducer

class kafkamsgproducer:

def __init__(self, server):

self._server = server

self.producer = none

def connect(self):

if self.producer is none:

producer = kafkaproducer(bootstrap_servers=self._server)

self.producer = producer

def close(self):

if self.producer is not none:

self.producer.close()

self.producer = none

def send(self, topic, msg):

if self.producer is not none:

if not isinstance(msg, bytes):

msg = msg.encode("utf-8") # 將str型別轉換為bytes型別

self.producer.send(topic=topic, value=msg)

def run():

producer = kafkamsgproducer("localhost:9092")

producer.connect() # 建立連線

topic = "yanchampion-test"

print("start sending msg to kafka!")

for msg in " 111 222 333 444".split():

producer.send(topic=topic, msg=msg) # 向kafka 指定topic傳送資料

time.sleep(1)

if __name__ == '__main__':

run() # 執行程式

以上**即可完成訊息的訂閱。

因為kafka是 發布-定於模式。所以,乙個topic可以有多個consumer訂閱,並且,每個consumer都可以收到同一條訊息。那麼讓我們先來執行兩個consumer.py檔案。

開啟不同的終端

python3 consumer.py

接下來再執行producer.py

python3 producer.py

通過觀察,最終可以看到,兩個執行了consumer.py 的終端 都可以收到訊息

[root@yanchampion kafka-demo]# python3 consumer.py 

收到訊息: b'111'

收到訊息: b'222'

收到訊息: b'333'

收到訊息: b'444'

使用python連線kafka

kafka是高吞吐的訊息佇列系統,輕鬆支援每秒百萬級的寫入請求,這種特性也使得kafka在日誌處理等海量資料場景廣泛應用。kafka依賴於zookeeper執行,zookeeper充當了協調和管理kafka集群的任務,並且儲存一些meta資訊。此處,因作者能力有限,不詳細討論kafka與zookee...

使用python操作kafka

使用python操作kafka目前比較常用的庫是kafka python庫 pip3 install kafka pythonproducer test.py from kafka import kafkaproducer producer kafkaproducer bootstrap serve...

Python使用多程序實現串列埠收發資料

在之前一篇文章中 python使用多執行緒實現串列埠收發資料,提到了使用多執行緒實現串列埠收發資料,曉得多執行緒的朋友可能會有點疑問 多執行緒是單cpu,雖然在io中速度比較快,但是對於乙個大的專案,多執行緒本身是加速不了太多的 針對這個問題,我用multiprocessing改了一下 window...