kafka生產者例項配置引數

2021-10-03 23:28:02 字數 2606 閱讀 4621

kafkaproducer中有三個引數是必填的:

bootstrap.servers: 指定生產者客戶端連線kafka集群所需的broker位址列表,格式為host1:port1,host2:port2,可以設定乙個或多個。這裡並非需要所有的broker位址,因為生產者會從給定的broker裡尋找其它的broker。

key.serializer和value.serializer:broker接收訊息必須以位元組陣列byte形式存在,kafkaproducer和producerrecord中的泛型就是key和value的型別。key.serializer和value.serializer分別用來指定key和value序列化操作的序列化器,無預設值。類的全限定名。

非必填引數

client.id:這個引數用來設定kafkaproducer對應的客戶端id,預設值為「」,如果不設定,會自動生成乙個非空字串,內容形式如:「producer-1」,「producer-2」…

retries:配置生產者重試次數,對於可重試異常,那麼只要在規定的次數內自行恢復了,就不會丟擲異常,預設是0。

retry.backoff.ms用來設定兩次重試之間的時間間隔,預設值100。

partitioner.class:顯示配置使用哪個分割槽器。

interceptor.classes:指定自定義***,多個傳list集合。

buffer.memory:生產者客戶端recordaccumulator快取大小,預設值為33554432b,即32m。

batch.size:producerbatch可以復用記憶體區域的大小

max.block.ms:最大阻塞時間,recordaccumulator快取不足時或者沒有可用的元資料時,kafkaproducer的send()方法呼叫要麼被阻塞,要麼丟擲異常,此引數的預設值為60000,即60s。

metadata.max.age.ms:當客戶端超過這個時間間隔時就會更新元資料資訊預設值300000,即5分鐘。元資料指集群中有哪些主題,主題有哪些分割槽,每個分割槽leader副本在哪個節點上,follower副本在哪個節點上,哪些副本在ar,isr等集合中,集群中有哪些節點等等。

acks:用來指定必須要多少個副本收到這條訊息,之後生產者才會認為這條訊息成功寫入。acks是生產者客戶端中乙個非常重要的引數,它涉及訊息的可靠性和吞吐量之間的權衡。acks引數有三種型別的值:字串型別,不是整型。

​ 1:只要分割槽的leader副本寫入成功,生產者就會收到來自服務端的成功響應。如果再被其它follower副本拉取前leader副本崩潰,那麼此時訊息還是會丟失。

​ 0:生產者傳送訊息之後不需要等待任何服務端的響應。如果在訊息傳送到寫入kafka的過程**現了某些異常,導致kafka並沒有收到這條訊息,那麼生產者也無從得知,訊息會丟失。

​ -1或者all:生產者傳送訊息之後,需要等待isr中所有副本成功寫入訊息之後才能收到來自服務端的成功響應。

max.request.size:用來限制生產者客戶端能傳送的訊息的最大值,預設值為1048576b,即1mb。這個引數涉及到其它引數的聯動,比如broker端的message.max.bytes引數。對kafka沒有足夠把控的時候不要更改此引數。

connections.max.idle.ms:用來指定多久之後關閉閒置的連線,預設值540000(ms),即9min

linger.ms:生產者傳送producerbatch之前等待更多訊息(producerrecoder)加入producerbatch的時間,預設值為0。生產者客戶端會在producerbatch被填滿或者等待時間超過linger.ms時傳送出去。增大這個引數的值會增加訊息的延遲,但同時會提高吞吐量。

receive.buffer.bytes:用來設定socket接收緩衝區的大小,預設值為32768(b),即32kb,如果設定為-1,則使用作業系統的預設值。

send.buffer.bytes:用來設定socket傳送緩衝區的大小,預設值為131072(b),即128kb,如果設定為-1,則使用作業系統預設值。

request.timeout.ms:配置producer等待請求響應的最長時間,預設值為30000(ms),請求超時之後可以進行重試。注意這個引數需要比broker端引數replica.lag.time.max.ms的值要大,這樣可以減少因客戶端重試而引起的訊息重複的概率。

enable.idempotence:是否開啟冪等性功能,預設值false

max.in.flight.requests.per.connection:限制每個連線,也就是客戶端與node之間的連線最多快取請求數,預設值5

transactional.id:設定事物id,必須唯一,預設值null

Kafka生產者引數優化

properties props new properties 集群位址,多個伺服器用 分隔 props.put bootstrap.servers 192.168.72.141 9092,192.168.72.142 9092,192.168.72.143 9092 重新傳送訊息次數,到達次數返回...

Kafka消費者生產者例項

它允許發布和訂閱記錄流,類似於訊息佇列或企業訊息傳遞系統。它可以容錯的方式儲存記錄流。它可以處理記錄發生時的流。由於主要介紹如何使用kafka快速構建生產者消費者例項,所以不會涉及kafka內部的原理。乙個基於kafka的生產者消費者過程通常是這樣的 來自官網 cd kafka 2.11 0.11....

Kafka消費者生產者例項

2017年07月30日18 22 56 rhwayfunn 閱讀數 13818標籤 kafka 更多 個人分類 分布式系統 為了更為直觀展示卡夫卡的訊息生產消費的過程,我會從基於控制台和基於應用兩個方面介紹使用例項.kafka是乙個分布式流處理平台,具體來說有三層含義 它允許發布和訂閱記錄流,類似於...