Kafka學習筆記(二) 生產者詳解

2021-10-11 09:06:52 字數 3699 閱讀 7603

2、傳送原理刨析

3、其他生產者引數

訊息要到網路上進行傳輸,必須進行序列化,而序列號器的作用就是入磁。

kafka提供了預設的字串序列化器(org.apache.kafka.common.serialization.stringserializer),還有整型(integerserializer)和位元組陣列(bytesserializer)序列化器,這些序列化器都實現了介面(org.apache.kafka.common.serialization.serializer)基本上能夠滿足大部分場景的需求。

/**

* 自定義序列化器

*/public

class

companyserializer

implements

serializer

@override

public

byte

serializer

(string topic,company data)

byte

name,address;

tryelse

if(data.

getaddress()

!=null)

else

bytebuffer buffer = bytebuffer.

allocate(4

+4+name.length+address.length)

; buffer.

putint

(name.length)

; buffer.

put(name)

; buffer.

putint

(address.length)

; buffer.

put(address)

;return buffer.

array()

;}catch

(unsupportedencodinfexception e)

return

newbyte[0

];}@override

public

void

close()

}

本身kafka有自己的分割槽策略的,如果未指定,就會使用預設的分割槽策略

kafka根據傳遞訊息的key來進行分割槽的分配,即hash(key)%nnumpartitions。如果key相同的化,那麼就會分配到同一分割槽。

原始碼:org.apache.kafka.clients.producer.internals.defaultpartitioner

public

intpartition

(string topic,object key,

byte

keybytes,object value,

byte

valuebytes,cluster cluster)

else

}else

}

/**

* 自定義分割槽器

*/public

class

definepartitioner

implements

partitioner

else

}@override

public

void

close()

}

//自定義分割槽器的使用

props.

put(producerconfig.partitioner_class_config,definepartitioner.

class

.getname()

);

producer***(interceptor)是相當新的功能,它和consumer端interceptor是在kafka0.10版本被引入的,主要用於實現client端的定製化控制邏輯。

生產者***可以用在訊息傳送前做一些準備工作。

使用場景

按照某個規則過濾掉不符合要求的訊息

修改訊息的內容

統計類需求

/**

* 自定義***

​ 訊息傳送的過程中,設計到兩個執行緒協調工作,主線程首先將業務資料封裝成producerrecord物件,之後呼叫send()方法將訊息方入recordaccumulator(訊息收集器,也可以理解為主執行緒與sender執行緒直接的緩衝區)中暫存,sender執行緒負責將訊息構成請求,並最終執行網路i/o的執行緒,它從recordaccumulator中取出訊息並批量傳送出去,需要注意的是,kafkaproducer是執行緒安全的,多個執行緒間可以共享使用同乙個kafkaproducer物件

這個引數用來指定分割槽中必須有多少個副本收到這條訊息,之後生產者才會認為這條訊息是寫入成功的。acks是生產者客戶端中非常重要的乙個引數,它涉及到訊息的可靠性和吞吐量之間的權衡。

注意:acks引數配置的時乙個字串型別,而不是整數型別,如果配置為證書型別會丟擲異常

生產者從伺服器收到的錯誤有可能時臨時性的錯誤(比如分割槽找不到首領),在這種情況下,如果達到了retries設定的次數,生產者會放棄重試並返回錯誤,預設情況下,生產者會在每次重試之間等待100ms,可以通過retry.backoff.ms引數來修改這個時間間隔。

當有多個訊息被傳送到同乙個分割槽時,生產者會把他們放在同乙個批次裡,該引數指定了乙個批次可能使用的記憶體大小,按照位元組數計算,而不是訊息個數,當批次被填滿,批次裡的所有訊息會被傳送出去。不過生產者並不一定都會等到批次被填滿才傳送,半滿的批次,甚至只包含乙個訊息的批次也可能被傳送。所以就算把batch.size設定的很大,也不會造成延遲,只會占用更多的記憶體而已,如果設定的大小,生產者會因為頻繁傳送訊息而增加一些額外的開銷。

該引數用於控制生產者傳送的請求大小,它可以指定能傳送的單個訊息的最大值,也可以指單個請求裡所有訊息的總大小。broker對可接收的訊息最大值也有自己的限制(message.max.size),所以兩邊的配置最好匹配,避免生產者傳送的訊息被broker拒絕。

kafka學習筆記(二) 生產者介紹

生產者傳送訊息的過程 分割槽原則 資料可靠性保證 acsk 0 producer不等待broker的ack,這樣可以提供最低的延遲,broker一接收到還沒有寫入到磁碟就返回ack,當broker出現故障的時候可能會丟失資料 1 producer等待broker返回ack,partition的lea...

kafka學習總結006 生產者事務

前面提到了,kafka0.11.0版本引入的冪等性只能保證分割槽級別的at exactly once語義 如圖,producer向三個分割槽分別生產10條資料,前兩個生產成功,寫第三個分割槽時,producer掛掉 producer重啟後,重新向三個分割槽寫入資料 此時producer的pid變化,...

kafka 生產者(二)

想要提高生產者的吞吐量可以通過調整一下4個引數來實現 batch.size 批次大小,預設16k linger.ms 等待時間,修改為5 100ms recordaccumulator 緩衝區大小,修改為64m 實現 public class customproducerparameters 關閉資...