Kafka生產者和消費者

2021-10-12 10:39:39 字數 3598 閱讀 8109

一、生產者

1、分割槽的原因

方便在集群中擴充套件:

每個分割槽都可以通過調整副本數,改變分割槽副本所佔的kafka節點。

每個topic又有多個分割槽,這樣就可以靈活的改變集群的大小和所佔的機器數

可以提高併發:

同乙個topic的資料,可以分散到不同的分割槽,

而不同的分割槽資料可以被不同的consumer併發消費。

2、分割槽的原則

我們需要將producer傳送的資料封裝成乙個producerrecord物件。

(1)、指明 partition 的情況下,資料往指定分割槽裡面傳送。

(2)、沒有指明 partition 值但有 key 的情況下,通過key進行hash計算,

算出對應的partition進行傳送資料

(3)、既沒有 partition 值又沒有 key 值的情況下,

使用round-robin演算法(輪詢),進行傳送資料。

(4)、partition:

producerrecord可以只包含topic和訊息的value,

key預設是null,但是大多數應用程式會用到key。

key可以用以區分資料要傳送到那個分割槽。

如果為null,則使用輪詢演算法傳送到不同的分割槽

如果有key,則相同key的資料傳送到同乙個分割槽,保持訊息的順序性。

還可以實現partitioner介面,來自定義演算法傳送到那個分割槽。

3、資料可靠性保證

三種ack模式:

1、acks引數為0:

傳送完就不管了,不管leader和follower是否接收到了訊息,

效率最高,但是可能會丟失資料。

2、acks引數為1:

傳送完成之後,需要leader副本收到訊息之後,返回乙個應答。

3、 acks引數為2:

傳送完成之後,需要leader和follow副本收到訊息之後,

返回乙個應答。

二、生產者

1、消費者和消費者組

(1)、生產者把資料傳送到同乙個topic的不同分割槽,

然後consumer組去監聽這個topic,consumer組裡面的

多個consumer去分別消費不同分割槽裡面的訊息。

(2)、topic對應consumer組,topic裡面不管有多少個分割槽,consumer組都可以接收,

但是乙個分割槽資料,只能被組內的乙個consumer消費。

(3)、topic也可以往多個consumer組傳送,多個consumer組消費相同的topic資料,

每個組的消費邏輯和上面的(2)一樣。

(4)、組中 consumer 的數量超過分割槽數,多出的 consumer 會被閒置。

因此,如果想提高消費者的並行處理能力,需要設定足夠多的 partition 數量。

2、同一系統kafka集群處理高併發消費問題:

乙個topic多設定分割槽,消費組裡面設定對應數量的消費者,這樣就可以將同乙個topic的資料分散到不同的分割槽,

然後由消費組裡面的不同的消費者分別消費不同的分割槽資料,達到併發消費效果。

3、不同系統和kafka集群的負載均衡

只要保證每個應用程式有自己的 consumer group,就可以獲取到 topic 所有的訊息:

同乙個topic中的不同分割槽資料,會傳送到不同的 consumer group,

這些 consumer group裡面的多個consumer去消費不同分割槽的資料,

同乙個分割槽的資料只能被乙個consumer 消費到。

不同的系統去消費不同的consumer group,這樣就實現了多系統之間的訊息共享。

4、建立 kafka 消費者

(1)、在讀取訊息之前,需要先建立乙個 kafkaconsumer 物件

<1>、設定kafka集群訊息,

<2>、設定消費者所屬組

<3>、設定key和value序列化

(2)、建立了消費者之後,需要訂閱 topic,

subscribe() 方法接受乙個主題列表作為引數:

5、關於訊息順序

(1)、不關注訊息順序的情況下:

生產者像topic傳送資料,不指定key,不指定分割槽,

這樣訊息會被負載到多個分割槽上去。

消費者組裡面有多個消費者,這些消費者去會主動去消費不同的分割槽資料,

這樣就可以提高效率。

(2)、關注訊息順序的情況下:

生產者向topic傳送資料的時候,就需要指定key或者

自己實現partition介面計算傳送到那個分割槽。

這種需要順序的資料,肯定是只能往乙個分割槽傳送,

因為kafka只能保證分區內的順序。

消費者從固定分割槽讀取資料,這個時候是可以保持順序的,

但是讀取之後一般會使用多執行緒的方式來處理訊息,

這個時候就保證不了順序了。

可以加乙個記憶體 queue佇列,消費者的資料先往記憶體 queue裡面存,

然後用多執行緒的方式去處理記憶體 queue裡面的資料,

這樣就可以保證訊息順序。

三、總結

1、單獨乙個系統處理

(1)、亂序併發處理

乙個topic分割槽多個,每個分割槽按照副本數,最終在不同的kafka節點上生成分割槽副本。

生產者發訊息不指定對應的分割槽,資料會分散到不同的分割槽上

(實際上會落在每個分割槽的leadrd副本上)。

消費者組指定消費topic資料,組內的消費者就會各自消費乙個

(並且同組的消費者也只能消費乙個分割槽資料)分割槽的資料。

因為消費者只能記錄乙個分割槽的偏移量,所以當資料被拆分成多個分割槽的時候,

資料的偏移量就失去意義了

(2)、順序處理

生產者發訊息指定特定的分割槽

消費者組裡面的消費者也會只消費者乙個分割槽的資料,從而確保消費訊息的順序。

2、多個系統處理

只要保證每個應用程式有自己的 consumer group,就可以獲取到 topic 所有的訊息,

各個系統就可以各種處理訊息。

Kafka的生產者和消費者

org.slf4j slf4j log4j12 1.7.25 org.slf4j slf4j api 1.7.25 org.apache.kafka kafka clients 0.10.2.1 至少需要2個jar public class sendmessageproducer long end ...

Kafka消費者生產者例項

它允許發布和訂閱記錄流,類似於訊息佇列或企業訊息傳遞系統。它可以容錯的方式儲存記錄流。它可以處理記錄發生時的流。由於主要介紹如何使用kafka快速構建生產者消費者例項,所以不會涉及kafka內部的原理。乙個基於kafka的生產者消費者過程通常是這樣的 來自官網 cd kafka 2.11 0.11....

Kafka消費者生產者例項

2017年07月30日18 22 56 rhwayfunn 閱讀數 13818標籤 kafka 更多 個人分類 分布式系統 為了更為直觀展示卡夫卡的訊息生產消費的過程,我會從基於控制台和基於應用兩個方面介紹使用例項.kafka是乙個分布式流處理平台,具體來說有三層含義 它允許發布和訂閱記錄流,類似於...