使KafKa每次讀取訊息到最新傳送訊息的解決方案

2021-08-21 11:54:22 字數 1810 閱讀 5154

情景是使kafka每次讀取訊息到最新傳送訊息,查了很多資料,對kafka的消費組偏移量也有些研究,但本地集群不同版本都有不少不同之處。即使目前解決了該問題,仍有不少坑待填(之前想在這邊放下關於消費組和偏移量的東西,但比較多比較雜,就開了乙個新坑:

考慮直接去掉偏移量,但清空偏移量想要reset乙個group的offset,但沒找到相應的操作的方法。於是就考慮是在建立group的時候就使brokers不要記錄它的offset:

在**裡實現是設定引數enable_auto_commit=false

但在程式中使用就有問題,建立和提交時自動監測,程式沒有偏移量就會報錯

因為沒法去掉偏移量,準備通過不斷新增groupid,使每個group獲取最新偏移量。

**實現方法是在groupid後加時間戳,**每次啟動使用新的group。此時初始偏移量offset的配置為kafka.auto.offset.reset=latest,可以做到每次都唯讀最新資料。

但不斷新建,topic下的group迅速增加,偏移量___consumer_offsets 多次儲存。且沒找到有效在topic下刪除group_id的方法,也沒辦法做到定期清理,會對效能產生影響。

就是呼叫官方提供的api,但這個api是簡單消費者api,具體講解在前面的鏈結裡

在**裡就是seektond方法(需要注意的是,如果你是初始化kafkaconsumer的時候指定的topic(high-level consumer 的 api),即使用subscribe方法,然後執行seektond方法會報錯。這是沒有指定分割槽而自動分割槽,同後面提交分割槽衝突。

必須在初始化之後呼叫assign()方法來指定抓哪個topic的哪個partition裡的資料,然後再呼叫seektond方法)。示例**如下:

listtopicpartitionlist= new arraylist<>();

topicpartitionlist.add(new topicpartition(this.consumer_kafka_topic,0));

this.consumer.assign(topicpartitionlist);

this.consumer.seektoend(topicpartitionlist);

因為當前集群指定分割槽編號為零。且僅有乙個分割槽,所以可以較好的滿足需求,實現目標。但對於需要多個分割槽的問題,這方面的設定就會比較複雜。

1.___consumer_offsets的產生速度很快,在集群其他topic沒有接入資料時,仍不斷重新整理消費者的偏移量,從不間斷。如此高頻次的記錄是為了什麼?

2.kafka權威指南 有一句話:如果消費者發生崩潰或者有新的消費者加入群組,就會觸發再均衡,完成再均衡之後,每個消費者可能分配到新的分割槽... 消費者加入,再均衡,分割槽 三者之間的關係是什麼。

更新7.16(沒錯,就是發布當天)

更新7.23

重新修改了敘述的問題(還好沒被很多人看到)。提出乙個當初忽略的問題:

3.如何刪除topic下的group_id,(之前版本group_id都是放在zookeeper下的,操作比較簡單),現在(0.10.1.1)放在boorker中,不知道如何操作,刪除。

volatile 每次都從該位址讀取

volatile關鍵字是一種型別修飾符,用它宣告的型別變數表示可以被某些編譯器未知的因素更改,比如 作業系統 硬體或者其它執行緒等。遇到這個關鍵字宣告的變數,編譯器對訪問該變數的 就不再進行優化,從而可以提供對特殊位址的穩定訪問。使用該關鍵字的例子如下 int volatile nvint 當要求使...

Python socket 讀取訊息問題

今天遇到python socket 讀取訊息不能讀取完全的問題 size struct.unpack i socket obj.recv 4 data size int size 0 data socket obj.recv data size 如此寫法並不能完全讀取訊息內容。原因是scoket r...

kafka消費讀取不了

問題一 啟動storm時,發現kafka一直讀取不了資料,但是查詢時,仍有資料沒有讀取 一直讀取不了,看了下 是kafka消費時,一直讀取不了資料 查詢,很符合kafka重複讀取引起的問題。確實storm讀取時是自動提交的,並且修改group,可以正常執行。因為是在開發環境,所以只是想測試下stor...