python連線kafka生產者,消費者指令碼

2021-08-20 10:55:25 字數 3343 閱讀 3137

#

-*- coding: utf-8 -*-

'''''

使用kafka-python 1.3.3模組

# pip install kafka==1.3.5

# pip install kafka-python==1.3.5

'''import

sysimport

time

import

json

from kafka import

kafkaproducer

from kafka import

kafkaconsumer

from kafka.errors import

kafkaerror

kafaka_host = "

101.236.51.235

"kafaka_port = 9092kafaka_topic = "

test

"class

kafka_producer():

'''''

生產模組:根據不同的key,區分訊息

'''def

__init__

(self, kafkahost,kafkaport, kafkatopic, key):

self.kafkahost =kafkahost

self.kafkaport =kafkaport

self.kafkatopic =kafkatopic

self.key =key

print("

producer:h,p,t,k

",kafkahost,kafkaport,kafkatopic,key)

bootstrap_servers = ':'

.format(

kafka_host=self.kafkahost,

kafka_port=self.kafkaport

)print("

boot svr:

",bootstrap_servers)

self.producer = kafkaproducer(bootstrap_servers =bootstrap_servers

)defsendjsondata(self, params):

try:

parmas_message = json.dumps(params,ensure_ascii=false)

producer =self.producer

print

(parmas_message)

v = parmas_message.encode('

utf-8')

k = key.encode('

utf-8')

print("

send msg:(k,v)

",k,v)

producer.send(self.kafkatopic, key=k, value=v)

producer.flush()

except

kafkaerror as e:

print

(e)class

kafka_consumer():

'''''

消費模組: 通過不同groupid消費topic裡面的訊息

'''def

__init__

(self, kafkahost, kafkaport, kafkatopic, groupid):

self.kafkahost =kafkahost

self.kafkaport =kafkaport

self.kafkatopic =kafkatopic

self.groupid =groupid

self.key =key

self.consumer = kafkaconsumer(self.kafkatopic, group_id =self.groupid,

bootstrap_servers = ':'

.format(

kafka_host=self.kafkahost,

kafka_port=self.kafkaport )

)defconsume_data(self):

try:

for message in

self.consumer:

yield

message

except

keyboardinterrupt as e:

print

(e)def

main(xtype, group, key):

'''''

測試consumer和producer

'''if xtype == "p"

:

#生產模組

producer =kafka_producer(kafaka_host, kafaka_port, kafaka_topic, key)

print ("

**********=> producer:

", producer)

for _id in range(100):

params = '

' %str(_id)

params=[,]

producer.sendjsondata(params)

time.sleep(1)

if xtype == 'c'

:

#消費模組

consumer =kafka_consumer(kafaka_host, kafaka_port, kafaka_topic, group)

print ("

**********=> consumer:

", consumer)

message =consumer.consume_data()

for msg in

message:

print ('

msg---------------->k,v

', msg.key,msg.value)

print ('

offset---------------->

', msg.offset)

if__name__ == '

__main__':

xtype = sys.argv[1]

group = sys.argv[2]

key = sys.argv[3]

main(xtype, group, key)

使用方式

生產訊息

python testkafka.py p g k

消費訊息

python testkafka.py c g k

使用python連線kafka

kafka是高吞吐的訊息佇列系統,輕鬆支援每秒百萬級的寫入請求,這種特性也使得kafka在日誌處理等海量資料場景廣泛應用。kafka依賴於zookeeper執行,zookeeper充當了協調和管理kafka集群的任務,並且儲存一些meta資訊。此處,因作者能力有限,不詳細討論kafka與zookee...

python3 連線kafka生產測試資料

log pip3 install kafka pip3 install kafka python 1.通過指令碼實現讓kafka生產測試資料,測試下游業務服務效能 2.可以增加執行緒池,讓多執行緒併發執行,效果更好 usr bin env python encoding utf 8 author y...

Kafka生產過程

1.寫入方式 寫磁碟效率比隨機寫記憶體要高,保障kafka吞吐率 2.分割槽 partition kafka集群有多個訊息 伺服器 broker server 組成,發布到kafka集群的每條訊息都有乙個類別,用主題 topic 來表示。通常,不同應用產生不同型別的資料,可以設定不同的主題。乙個主題...