Kafka之生產者

2021-10-01 04:20:36 字數 3263 閱讀 1030

(1)方便在集群中擴充套件,乙個topic可以有多個partition組成,而每個partition可以通過調整以適應它所在的機器

(2)可以提高併發,因為可以以partition為單位讀寫

我們需要將生產者傳送的資料封裝成乙個producerrecord物件。

(1)指明partition的情況下,直接將指明的值作為partition值;

(2)沒指明partition,但有key的值,將key的hash值與topic的partition數進行取餘得到partition值

(3)既沒有partition又沒有key的值時,第一次呼叫時隨機生成乙個整數(後面每次呼叫在這個整數上自增),將這個值與topic可用的partition總數取餘得到partition值,也就是常說的round-robin演算法

當topic的每個partition收到producer傳送的資料後,都需要向producer傳送ack(acknowledgement 確認收到),如果producer收到ack後,就會進行下一輪的傳送,否則重新傳送資料。

確保全部的follower同步完成之後,才可以發ack,這樣能保證leader掛掉之後,能在follower中選舉出新的leader。(follower 同步方案還有第二種,半數副本以上完成同步,也可以發ack,但缺點是選舉新的leader時,容忍n臺節點故障,但需要2n+1個副本;而全部同步完成選新leader,容忍n臺節點故障,只需要n+1個副本。不過第二種方案會造成大量資料的冗餘,第一種雖然網略延遲高,但對kafka影響較小)

leader維護了乙個動態的in-sync replica set (isr),意為和leader保持同步的follower集合。當isr中的follower完成資料同步之後,leader就會給producer傳送ack。如果follower長時間未向leader同步資料,則該follower將被踢出isr,該時間閾值由replica.lag.time.max.ms引數設定。leader發生故障之後,就會從isr中選舉新的leader。

對某些不太重要的資料,對資料的可靠性要求不是很高,能容忍資料的少量丟失,所以沒必要等isr中的follower全部接收成功。

kafka提供了三種可靠級別

acks引數配置:

0:producer不等broker的ack,這樣延遲最低,但broker接收到還沒寫入磁碟就已經返回,當broker故障時有可能丟失資料

1:producer等待broker的ack,leader落盤成功後返回ack。如果follower還沒同步,但這個時候leader返回了ack,並且leader故障,此時producer已經收到ack,預設傳送成功,不會繼續傳送上一條資訊,這就導致資料丟失

-1:prodicer等待broker的ack,partition的leader和follower全部落盤成功後才返回ack。如果在follower同步之後,broker傳送ack之前,leader出現故障,會從follower中選舉乙個leader,但是此時producer沒收到ack,會重新傳送上條資訊,但是副本已經同步完故障前的leader了,這樣就會造成資料重複

(1)follower故障

follower發生故障會被臨時踢出isr, 待該follower恢復後,會讀取本地磁碟上次記錄的hw, 並將log檔案高於hw的部分擷取掉,從hw開始向leader進行同步。等該follower的leo大於等於該partition的hw,即follower追上leader之後,就可以重新加入isr了

(2)leader故障

leader發生故障,會從isr中選出乙個新的leader,為保證副本間資料一致性,其餘follower會先將各自的log檔案高於hw的部分截掉,然後從新的leader同步資料。

注意:這只能保證副本之間的資料一致性,並不能保證資料不丟失或不重複

有些非常重要的資訊,比如交易資料,要求資料既不重複也不能丟失,即exactly once語義

將伺服器的ack級別設定為-1,可以保證producer到server之間不會丟失資料,即at least once語義

0.11版本的kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指producer不論向server傳送多少次重複資料,server端都只會持久化一條。

kafka的exactly once語義:

at least once + 冪等性 = exactly once

如何啟用冪等性:

只需要將producer的引數中enable.idompotence設定為true即可(kafka自動將acks屬性設為-1,並將retries屬性設為integer.max_value。)。

kafka的冪等性實現其實就是將原來下游需要做的去重放在了資料上游。

開啟冪等性的producer在初始化的時候會被分配乙個pid,發往同一partition的訊息會附帶sequence number。而broker端會對做快取,當具有相同主鍵的訊息提交時,broker只會持久化一條。

producer重啟,pid就會變化,所以冪等性只能保證單會話的exactly once

作用:開啟事務後,生產者能實現跨會話的冪等性,也就是說能實現不同分割槽、不同topic傳送的多條資訊的原子性。

為了實現跨分割槽跨會話的事務,需要引入乙個全域性唯一的transaction id(存在zookeeper上邊),並將producer獲得的pid和transaction id繫結。這樣當producer重啟後就可以通過正在進行的transaction id獲得原來的pid。

kafka 生產者(二)

想要提高生產者的吞吐量可以通過調整一下4個引數來實現 batch.size 批次大小,預設16k linger.ms 等待時間,修改為5 100ms recordaccumulator 緩衝區大小,修改為64m 實現 public class customproducerparameters 關閉資...

Kafka之生產者 筆記一

文章內容選自 kafka技術內幕 1.訊息系統通常是由三大塊組成 生產者 消費者 訊息 功能 生產者會將訊息寫入訊息 中,消費者會從訊息 中讀取訊息。對於訊息 而言,消費者和生產者都是客戶端。2.通訊步驟 1.生產者客戶端應用產生訊息。2.生產者包裝訊息到請求頭中,傳送到客戶端。3.服務端物件負責接...

kafka生產者分割槽策略

kafka生產者 分割槽策略 分割槽的原因 1 方便在集群中擴充套件,每個partition可以通過調整以適應它所在的機器,而乙個topic又 可以有多個partition組成,因此整個集群就可以適應任意大小的資料了 2 可以提高併發,因為可以以partition為單位讀寫了。分割槽的原則 1 指明...