python kafka,從指定位置消費資料

2022-03-20 00:54:08 字數 1040 閱讀 1651

# @staticmethod

def get_kafka_reviews(self):

# print type(self.bootstrap_servers)

consumer = kafka.kafkaconsumer(bootstrap_servers=[self.bootstrap_servers],group_id='wm_group',auto_offset_reset='latest', enable_auto_commit=false)

consumer.subscribe(topics=(self.topics)) #訂閱要消費的主題

# print consumer.topics()

# print "+++++++",consumer.position(topicpartition(topic=u'ctripapi_duplicateddata_review', partition=1)) #獲取當前主題的最新偏移量

review_list =

for message in consumer:

print '====%s:%d:%d:key-%s value=%s=='%(message.topic,message.partition,message.offset,message.key,message.value)

if len(review_list)==self.num: #先取100條來消費

break

return review_list

解釋:

consumer = kafka.kafkaconsumer(bootstrap_servers=[self.bootstrap_servers],group_id='wm_group',auto_offset_reset='latest', enable_auto_commit=false)

自動提交位移設為flase, 預設為取最新的偏移量,重新建立乙個guou_id,這樣就實現了不影響別的應用程式消費資料,又能消費到最新資料,實現預警(先於使用者發現)的目的。

mysql 自增長從指定位置開始增長

create table tab incoming project id int 11 not null auto increment,proid int 11 not null,projectname varchar 255 not null,partner int 11 not null,yea...

如何從檔案中提取指定位置的資料????

如何從檔案中提取指定位置的資料?vcl元件開發及應用 資料如下 begin of epoch gps utc 172800.0 2006 12 12 0 0 0.0 plh ddmmss.ss clk ztd m 385508.36127 770358.37552 40.803 191.177 0....

linux提取指定行至指定位置

bin csh f if f errorlog.rpt then rm rf errorlog.rpt endif ls log loglst.lst 將log檔案寫到指定檔案 loop execute set n wc l loglst.lst 得到log檔案個數 行數 echo n set i ...