Kafka消費者 從Kafka讀取資料

2021-10-01 22:14:14 字數 2911 閱讀 2241

目錄

前言:1、kafkaconsumer概念

1.1、消費者和消費者群組

1.2 、消費者群組和分割槽再均衡

2、建立kafka消費者

3、訂閱主題

4、輪詢

5、消費者的配置

6、提交和偏移量

7、再均衡***

8、從特定偏移量處開始處理記錄

9、如何退出

10、反序列化器

應用程式使用

kafkaconsumer

向 kafka

訂閱主題,並從訂閱的主題上接收訊息。 從

kafka 讀取資料不同於從其他訊息系統讀取資料。

假設我們有乙個應用程式需要從乙個

kafka

主題讀取訊息並驗證這些訊息,然後再把它們 儲存起來。應用程式需要建立乙個消費者物件,訂閱主題並開始接收訊息,然後驗證訊息 並儲存結果。過了一陣子,生產者往主題寫入訊息的速度超過了應用程式驗證資料的速度,這個時候該怎麼辦?如果只使用單個消費者處理訊息,應用程式會遠跟不上訊息生成 的速度。顯然,此時很有必要對消費者進行橫向伸縮。就像多個生產者可以向相同的主題 寫入訊息一樣,我們也可以使用多個消費者從同乙個主題讀取訊息,對訊息進行分流。

kafka

消費者從屬於

消費者群組

。乙個群組裡的消費者訂閱的是同乙個主題,每個消費者 接收主題一部分分割槽的訊息。

假設主題

t1 有

4 個分割槽,我們建立了消費者

c1,它是群組

g1 裡唯一的消費者,我們用 它訂閱主題 t1。

消費者

c1 將收到主題

t1 全部

4 個分割槽的訊息,如下圖1-1圖

所示。

圖1-1、乙個消費者收到四個分割槽的訊息

如果在群組

g1 裡新增乙個消費者

c2,那麼每個消費者將分別從兩個分割槽接收訊息。我們 假設消費者 c1

接收分割槽

0 和分割槽

2 的訊息,消費者

c2 接收分割槽

1 和分割槽

3 的訊息,如圖 1-2 所示。

圖1-2、兩個消費者收到四個分割槽的訊息

如果群組

g1 有

4 個消費者,那麼每個消費者可以分配到乙個分割槽,如圖 1

-3 所示。

圖1-3、四個消費者收到四個分割槽的訊息

如果我們往群組裡新增更多的消費者,超過主題的分割槽數量,那麼有一部分消費者就會被 閒置,不會接收到任何訊息,如圖 1-4

所示。

圖1-4、五個消費者收到四個分割槽的訊息

往群組裡增加消費者是橫向伸縮消費能力的主要方式。

kafka

消費者經常會做一些高延遲 的操作,比如把資料寫到資料庫或 hdfs

,或者使用資料進行比較耗時的計算。在這些情 況下,單個消費者無法跟上資料生成的速度,所以可以增加更多的消費者,讓它們分擔負 載,每個消費者只處理部分分割槽的訊息,這就是橫向伸縮的主要手段。我們有必要為主題 建立大量的分割槽,在負載增長時可以加入更多的消費者。不過要注意,

不要讓消費者的數 量超過主題分割槽的數量

,多餘的消費者只會被閒置。

除了通過增加消費者來橫向伸縮單個應用程式外,還經常出現多個應用程式從同乙個主題 讀取資料的情況。實際上,kafka

設計的主要目標之一,就是要讓

kafka

主題裡的資料能夠滿足企業各種應用場景的需求。在這些場景裡,每個應用程式可以獲取到所有的訊息, 而不只是其中的一部分。只要保證每個應用程式有自己的消費者群組,就可以讓它們獲取 到主題所有的訊息。不同於傳統的訊息系統,橫向伸縮 kafka

消費者和消費者群組並不會對效能造成負面影響。

在上面的例子裡,如果新增乙個只包含乙個消費者的群組

g2,那麼這個消費者將從主題 t1 上接收所有的訊息,與群組

g1 之間互不影響。群組

g2 可以增加更多的消費者,每個消費者可以消費若干個分割槽,就像群組 g1

那樣,如圖 1

-5 所示。

總的來說,群組

g2 還是會接收到所有訊息,不管有沒有其他群組存在。

簡而言之,為每乙個需要獲取乙個或多個主題全部訊息的應用程式建立乙個消費者群組,然後往群組裡新增消費者來伸縮讀取能力和處理能力,群組裡的每個消費者只處理一部分訊息。

圖1-5、兩個消費者對應乙個主題

群組裡的消費者共同讀取主題的分割槽。乙個新的消費者加 入群組時,它讀取的是原本由其他消費者讀取的訊息。當乙個消費者被關閉或發生崩潰 時,它就離開群組,原本由它讀取的分割槽將由群組裡的其他消費者來讀取。

在主題發生變 化時,比如管理員新增了新的分割槽,會發生分割槽重分配。分割槽的所有權從乙個消費者轉移到另乙個消費者,這樣的行為被稱為再均衡。再均衡非常重要,它為消費者群組帶來了高可用性和伸縮性(我們可以放心地新增或移除消費者), 不過在正常情況下,我們並不希望發生這樣的行為。在再均衡期間,消費者無法讀取訊息,造成整個群組一小段時間的不可用。另外,當分割槽被重新分配給另乙個消費者時,消費者當前的讀取狀態會丟失,它有可能還需要去重新整理快取,在它重新恢復狀態之前會拖慢應用程式。

kafka 主動消費 Kafka消費者的使用和原理

publicstaticvoidmain string args finally 前兩步和生產者類似,配置引數然後根據引數建立例項,區別在於消費者使用的是反序列化器,以及多了乙個必填引數 group.id,用於指定消費者所屬的消費組。關於消費組的概念在 kafka中的基本概念 中介紹過了,消費組使得...

kafka消費者無法消費異常

今天被乙個kafka消費異常折磨了一天,頭差點炸了,還好最後解決了它 異常 伺服器 record is corrupt 記錄損壞 不明原因 有可能磁碟空間不足導致 導致消費者無法正常消費訊息 卡在某乙個offset 不能繼續消費 解決辦法 先停掉消費者程式 殺掉程序 不可關閉kafka服務 然後手動...

十 kafka複雜消費者

注意 對於多個partition和多個consumer 1 如果consumer比partition多,是浪費,因為kafka的設計是在乙個partition上是不允許併發的,所以consumer數不要大於partition數 2 如果consumer比partition少,乙個consumer會對...