kafka 生產者(二)

2022-10-09 08:06:10 字數 2239 閱讀 1221

想要提高生產者的吞吐量可以通過調整一下4個引數來實現

batch.size:批次大小,預設16k

linger.ms:等待時間,修改為5-100ms

recordaccumulator:緩衝區大小,修改為64m

**實現

public

class

customproducerparameters

//關閉資源

kafkaproducer.close();}}

傳送流程ack 應答級別

資料完全可靠條件=ack級別設定為-1+分割槽副本大於等於2+isr裡應答的最小副本數量大於等於2

可靠性總結:

測試

public

class

customproduceracks

//3、關閉資源

kafkaproducer.close();}}

3.1、資料傳輸語義

kafka0.11版本以後,引入了一項重大特性:冪等性和事務

3.2、冪等性

冪等性指producer不論向broker傳送多少次重複資料,broker端都只會持久化一條,保證了不重複。

精確一次(exactlyonce)=冪等性+至少一次(ack=-1+分割槽副本數》=2+isr最小副本數量》=2)

重複資料的判斷標準:具有相同主鍵的訊息提交時,broker只會持久化一條。其中pid是kafka每次重啟都會分配乙個新的;partition表示分割槽號;sequencenumber是單調自增的。所以冪等性只能保證的是在單分割槽單會話內不重複

如何使用冪等性:開啟引數enable.idempotence 預設為true,false關閉

3.3、事務

說明:開啟事務,必須開啟冪等性。

producer 在使用事務功能前,必須先自定義乙個唯一的transactional.id。有了transactional.id,即使客戶端掛掉了,它重啟後也能繼續處理未完成的事務

事務測試

public

class

customproducertranaction

//提交事務

kafkaproducer.committransaction();

} catch

(exception e)

finally}}

單分區內,有序(有條件的,詳見下節);多分割槽,分割槽與分區間無序;

kafka在1.x版本之前保證資料單分割槽有序的條件 max.in.flight.requests.per.connection=1(不需要考慮是否開啟冪等性)

kafka在1.x及以後版本保證資料單分割槽有序分兩者情況

未開啟冪等性 max.in.flight.requests.per.connection需要設定為1

開啟冪等性 max.in.flight.requests.per.connection需要設定小於等於5

原因說明:因為在kafka1.x以後,啟用冪等後,kafka服務端會快取producer發來的最近5個request的元資料,故無論如何,都可以保證最近5個request的資料都是有序的。

Kafka之生產者

1 方便在集群中擴充套件,乙個topic可以有多個partition組成,而每個partition可以通過調整以適應它所在的機器 2 可以提高併發,因為可以以partition為單位讀寫 我們需要將生產者傳送的資料封裝成乙個producerrecord物件。1 指明partition的情況下,直接將...

kafka生產者分割槽策略

kafka生產者 分割槽策略 分割槽的原因 1 方便在集群中擴充套件,每個partition可以通過調整以適應它所在的機器,而乙個topic又 可以有多個partition組成,因此整個集群就可以適應任意大小的資料了 2 可以提高併發,因為可以以partition為單位讀寫了。分割槽的原則 1 指明...

kafka生產者API操作

建立producer傳送資訊給消費者 設定配置相關 private val prop new properties prop.setproperty bootstrap.servers mypc01 9092,mypc02 9092,mypc03 9092 prop.setproperty acks...