python向kafka傳送資料,並接收

2021-10-06 06:19:10 字數 1587 閱讀 3994

傳送端

import csv

import time

from kafka import kafkaproducer

from kafka import kafkaconsumer

import json

# 例項化乙個kafkaproducer示例,用於向kafka投遞訊息

producer = kafkaproducer(value_serializer=

lambda v: json.dumps(v)

.encode(

'utf-8'

),bootstrap_servers=

'192.168.130.28:9092'

)for x in

range(0

,1000):

time.sleep(

0.1)

# 每隔0.1秒傳送一行資料

# 傳送資料,topic為'test_data'

data =

print

(data)

producer.send(

'test_data'

, data)

我們再做乙個接收端

from kafka import kafkaconsumer

consumer = kafkaconsumer(

'test_data'

, bootstrap_servers=

['192.168.130.29:9092'])

for msg in consumer:

print

(msg.value)

recv =

"%s:%d:%d: key=%s value=%s"

%(msg.topic, msg.partition, msg.offset, msg.key, msg.value)

print

(recv)

很簡單的例子

錯誤syntaxerror: invalid syntax的解決方法總結

python -m pip install kafka-python
這裡有個問題就是消費者,如果不配置的話,消費者每次開啟後都會從最新的讀取,導致歷史資料沒辦法讀取出來,我們需要配置一下kafkaconsumer。

auto_offset_reset = earliest,只配置這個,會從kafka初始的資料消費,重複消費之前的資料。

我們需要再配置group_id=『my_group_new』。這樣就可以了,

bootstrap_servers,可以配置集群

consumer = kafkaconsumer(

'filestorage'

, group_id=

'my_group_new'

,auto_offset_reset=

'earliest'

,bootstrap_servers=

['xx:9092'

,'xx:9092'

,'xx:9092'],

)

Kafka 訊息傳送

建立乙個kafkaprodecer物件,傳入上面建立的properties物件 kafkaproducerproducer new kafkaproducer mykafkaprops 使用prodecerrecord string topic,string key,string value 建構函...

kafka訊息傳送模式

在kafka 0.8.2之後,producer不再區分同步 sync 和非同步方式 async 所有的請求以非同步方式傳送,這樣提公升了客戶端效率。producer請求會返回乙個應答物件,包括偏移量或者錯誤信。這種非同步方地批量的傳送訊息到kafka broker節點,因而可以減少server端資源...

Kafka 傳送訊息流程

客戶端的幾個元件 一條訊息首先需要確定要被儲存到那個 partition 對應的雙端佇列上 其次,儲存訊息的雙端佇列是以批的維度儲存的,即 n 條訊息組成一批,一批訊息最多儲存 n 條,超過後則新建乙個組來儲存新訊息 其次,新來的訊息總是從左側寫入,即越靠左側的訊息產生的時間越晚 最後,只有當一批訊...