kafka生產者和消費者(未整理完,待續)

2021-10-18 13:26:22 字數 4092 閱讀 7036

1、分割槽的原因:

a 方便在集群中擴充套件

每個partition可以通過調整以適應它所在的機器,而乙個topic又可以有多個partition組成,因此整個集群就可以適應任意大小的資料了

b 提高併發

可以以partition為單位讀寫

2、分割槽的原則

我們需要將producer傳送的資料封裝成乙個producerrecord物件。

(1)指明 partition 的情況下,直接將指明的值直接作為 partiton 值;

(2)沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數進行取餘得到 partition 值;

(3)既沒有 partition 值又沒有 key 值的情況下,所有發往指定話題的records,會積攢成乙個batch(達到一定大小或者兩條訊息間隔過長)一起傳送到乙個分割槽 。當形成新的batch,我們會隨機選擇乙個新的分割槽傳送。

為保證producer傳送的資料,能可靠的傳送到指定的topic,topic的每個partition收到producer傳送的資料後,都需要向producer傳送ack(acknowledgement確認收到),如果producer收到ack,就會進行下一輪的傳送,否則重新傳送資料。

kafka選擇了第二種方案,原因如下:

(1)同樣為了容忍n臺節點的故障,第一種方案需要2n+1個副本,而第二種方案只需要n+1個副本,而kafka的每個分割槽都有大量的資料,第一種方案會造成大量資料的冗餘。

(2)雖然第二種方案的網路延遲會比較高,但網路延遲對kafka的影響較小。

2)isr

採用第二種方案之後,設想以下情景:leader收到資料,所有follower都開始同步資料,但有乙個follower,因為某種故障,遲遲不能與leader進行同步,那leader就要一直等下去,直到它完成同步,才能傳送ack。這個問題怎麼解決呢?

leader維護了乙個動態的in-sync replica set (isr),意為和leader保持同步的follower集合。當isr中的follower完成資料的同步之後,leader就會給producer傳送ack。如果follower長時間未向leader同步資料,則該follower將被踢出isr,該時間閾值由replica.lag.time.max.ms引數設定。leader發生故障之後,就會從isr中選舉新的leader。

3)ack應答機制

對於某些不太重要的資料,對資料的可靠性要求不是很高,能夠容忍資料的少量丟失,所以沒必要等isr中的follower全部接收成功。

所以kafka為使用者提供了三種可靠性級別,使用者根據對可靠性和延遲的要求進行權衡,選擇以下的配置。

acks引數配置:

acks:

0:producer不等待broker的ack,這一操作提供了乙個最低的延遲,broker一接收到還沒有寫入磁碟就已經返回,當broker故障時有可能丟失資料;

1:producer等待broker的ack,partition的leader落盤成功後返回ack,如果在follower同步成功之前leader故障,那麼將會丟失資料

-1(all):producer等待broker的ack,partition的leader和follower全部落盤成功後才返回ack。但是如果在follower同步完成後,broker傳送ack之前,leader發生故障,那麼會造成資料重複

4)故障處理細節

(1)follower故障

follower發生故障後會被臨時踢出isr,待該follower恢復後,follower會讀取本地磁碟記錄的上次的hw,並將log檔案高於hw的部分擷取掉,從hw開始向leader進行同步。等該follower的leo大於等於該partition的hw,即follower追上leader之後,就可以重新加入isr了。

(2)leader故障

leader發生故障之後,會從isr中選出乙個新的leader,之後,為保證多個副本之間的資料一致性,其餘的follower會先將各自的log檔案高於hw的部分截掉,然後從新的leader同步資料。

注意:這只能保證副本之間的資料一致性,並不能保證資料不丟失或者不重複。

將伺服器的ack級別設定為-1,可以保證producer到server之間不會丟失資料,即at least once語義。相對的,將伺服器ack級別設定為0,可以保證生產者每條訊息只會被傳送一次,即at most once語義。

at least once可以保證資料不丟失,但是不能保證資料不重複;相對的,at least once可以保證資料不重複,但是不能保證資料不丟失。但是,對於一些非常重要的資訊,比如說交易資料,下游資料消費者要求資料既不重複也不丟失,即exactly once語義。在0.11版本以前的kafka,對此是無能為力的,只能保證資料不丟失,再在下游消費者對資料做全域性去重。對於多個下游應用的情況,每個都需要單獨做全域性去重,這就對效能造成了很大影響。

0.11版本的kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指producer不論向server傳送多少次重複資料,server端都只會持久化一條。冪等性結合at least once語義,就構成了kafka的exactly once語義。即:

at least once + 冪等性 = exactly once

要啟用冪等性,只需要將producer的引數中enable.idompotence設定為true即可。kafka的冪等性實現其實就是將原來下游需要做的去重放在了資料上游。開啟冪等性的producer在初始化的時候會被分配乙個pid,發往同一partition的訊息會附帶sequence number。而broker端會對做快取,當具有相同主鍵的訊息提交時,broker只會持久化一條。

但是pid重啟就會變化,同時不同的partition也具有不同主鍵,所以冪等性無法保證跨分割槽跨會話的exactly once。

consumer採用pull(拉)模式從broker中讀取資料。

push(推)模式很難適應消費速率不同的消費者,因為訊息傳送速率是由broker決定的。它的目標是盡可能以最快速度傳遞訊息,但是這樣很容易造成consumer來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費訊息。

pull模式不足之處是,如果kafka沒有資料,消費者可能會陷入迴圈中,一直返回空資料。針對這一點,kafka的消費者在消費資料時會傳入乙個時長引數timeout,如果當前沒有資料可供消費,consumer會等待一段時間之後再返回,這段時長即為timeout。

乙個consumer group中有多個consumer,乙個 topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費。

kafka有兩種分配策略,一是roundrobin,一是range。

roundrobin策略的原理是將消費組內所有消費者以及消費者所訂閱的所有topic的partition按照字典序排序,然後通過輪詢消費者方式逐個將分割槽分配給每個消費者。

range可以在乙個消費者組中單獨為指定消費者設定要消費的主題。這裡要強調的是

1.range策略針對於每個topic,各個topic之間分配時沒有任何關聯。

2.range 範圍分割槽策略是通過 partitions數/consumer數 來決定每個消費者應該消費幾個分割槽。如果除不盡,那麼前面幾個消費者將會多消費1個分割槽.

sticky

sticky分割槽策略初始分割槽和round robin一樣是輪詢機制。但與round robin機制不一樣的是,在有新的消費者加入到本消費者組時,sticky重新分配的分割槽個數較少,比較節省效能。

Kafka生產者和消費者

一 生產者 1 分割槽的原因 方便在集群中擴充套件 每個分割槽都可以通過調整副本數,改變分割槽副本所佔的kafka節點。每個topic又有多個分割槽,這樣就可以靈活的改變集群的大小和所佔的機器數 可以提高併發 同乙個topic的資料,可以分散到不同的分割槽,而不同的分割槽資料可以被不同的consum...

Kafka的生產者和消費者

org.slf4j slf4j log4j12 1.7.25 org.slf4j slf4j api 1.7.25 org.apache.kafka kafka clients 0.10.2.1 至少需要2個jar public class sendmessageproducer long end ...

Kafka消費者生產者例項

它允許發布和訂閱記錄流,類似於訊息佇列或企業訊息傳遞系統。它可以容錯的方式儲存記錄流。它可以處理記錄發生時的流。由於主要介紹如何使用kafka快速構建生產者消費者例項,所以不會涉及kafka內部的原理。乙個基於kafka的生產者消費者過程通常是這樣的 來自官網 cd kafka 2.11 0.11....