Python指令碼消費kafka資料

2021-08-19 23:03:36 字數 3916 閱讀 6826

一、簡介:

詳見:二、安裝

詳見部落格:

三、按照官網的樣例,先跑乙個應用

1、生產者:

from kafka import kafkaproducer

producer = kafkaproducer(bootstrap_servers=['172.21.10.136:9092']) #此處ip可以是多個['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]

for i in range(3):

msg = "msg%d" % i

producer.send('test', msg)

producer.close()

2、消費者(簡單demo):

from kafka import kafkaconsumer

consumer = kafkaconsumer('test',

bootstrap_servers=['172.21.10.136:9092'])

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

啟動後生產者、消費者可以正常消費。

3、消費者(消費群組)

from kafka import kafkaconsumer

consumer = kafkaconsumer('test',

group_id='my-group',

bootstrap_servers=['172.21.10.136:9092'])

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

啟動多個消費者,只有其中可以可以消費到,滿足要求,消費組可以橫向擴充套件提高處理能力

4、消費者(讀取目前最早可讀的訊息)

from kafka import kafkaconsumer

consumer = kafkaconsumer('test',

auto_offset_reset='earliest',

bootstrap_servers=['172.21.10.136:9092'])

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

auto_offset_reset:重置偏移量,earliest移到最早的可用訊息,latest最新的訊息,預設為latest

原始碼定義:

5、消費者(手動設定偏移量)

from kafka import kafkaconsumer

from kafka.structs import topicpartition

consumer = kafkaconsumer('test',

bootstrap_servers=['172.21.10.136:9092'])

print consumer.partitions_for_topic("test") #獲取test主題的分割槽資訊

print consumer.topics() #獲取主題列表

print consumer.subscription() #獲取當前消費者訂閱的主題

print consumer.assignment() #獲取當前消費者topic、分割槽資訊

print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量

consumer.seek(topicpartition(topic=u'test', partition=0), 5) #重置偏移量,從第5個偏移量消費

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

6、消費者(訂閱多個主題)

from kafka import kafkaconsumer

from kafka.structs import topicpartition

consumer = kafkaconsumer(bootstrap_servers=['172.21.10.136:9092'])

consumer.subscribe(topics=('test','test0')) #訂閱要消費的主題

print consumer.topics()

print consumer.position(topicpartition(topic=u'test', partition=0)) #獲取當前主題的最新偏移量

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

7、消費者(手動拉取訊息)

from kafka import kafkaconsumer

import time

consumer = kafkaconsumer(bootstrap_servers=['172.21.10.136:9092'])

consumer.subscribe(topics=('test','test0'))

while true:

msg = consumer.poll(timeout_ms=5) #從kafka獲取訊息

print msg

time.sleep(1)

8、消費者(訊息掛起與恢復)

from kafka import kafkaconsumer

from kafka.structs import topicpartition

import time

consumer = kafkaconsumer(bootstrap_servers=['172.21.10.136:9092'])

consumer.subscribe(topics=('test'))

consumer.topics()

consumer.pause(topicpartition(topic=u'test', partition=0))

num = 0

while true:

print num

print consumer.paused() #獲取當前掛起的消費者

msg = consumer.poll(timeout_ms=5)

print msg

time.sleep(2)

num = num + 1

if num == 10:

print "resume..."

consumer.resume(topicpartition(topic=u'test', partition=0))

print "resume......"

pause執行後,consumer不能讀取,直到呼叫resume後恢復。

如果對您有幫助,記得給我點贊諾

如果對您有幫助,記得給我點贊諾

Python指令碼消費kafka資料

一 簡介 詳見 二 安裝 詳見部落格 三 按照官網的樣例,先跑乙個應用 1 生產者 from kafka import kafkaproducer producer kafkaproducer bootstrap servers 172.21.10.136 9092 此處ip可以是多個 0.0.0....

kafka消費原理

consumer 採用 pull 拉 模式從 broker 中讀取資料。push 推 模式很難適應消費速率不同的消費者,因為訊息傳送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞訊息,但是這樣很容易造成 consumer 來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而 pul...

kafka 主動消費 Kafka消費者的使用和原理

publicstaticvoidmain string args finally 前兩步和生產者類似,配置引數然後根據引數建立例項,區別在於消費者使用的是反序列化器,以及多了乙個必填引數 group.id,用於指定消費者所屬的消費組。關於消費組的概念在 kafka中的基本概念 中介紹過了,消費組使得...