Kafka生產者 結合spring開發

2022-07-17 20:48:12 字數 4319 閱讀 7463

目錄spring-kafka生產端

producer向broker傳送訊息資料,需要有一定的可靠性,至少要保證資料:

1、不丟失

2、不重複

producer提供了一些引數,在編寫producer是進行合理設定和編寫,就可以保證資料的可靠性。

acks 引數配置

為保證producer傳送的資料能夠可靠的傳送到指定topic,topic的每個partition收到訊息後,都需要向producer傳送ack(acknowledgement確認收到),如果producer收到 ack,就會進行下一輪的傳送,否則重新傳送資料。

0: producer 不等待 broker 的 ack,這一操作提供了乙個最低的延遲, broker 一接收到還沒有寫入磁碟就已經返回,當 broker 故障時有可能丟失資料;

1: producer 等待 broker 的 ack, partition 的 leader 落盤成功後返回 ack,如果在 follower同步成功之前 leader 故障,那麼將會丟失資料;

-1(all) : producer 等待 broker 的 ack, partition 的 leader 和 follower 全部落盤成功後才返回 ack。但是如果在 follower 同步完成後, broker 傳送 ack 之前leader 發生故障,那麼會造成資料重複。

exactly once 語義

當ack級別設定為-1的時候,可以保證producer到broker之間不會丟失資料,即at

least once 語義 。相對的,將伺服器ack級別設定為0,可以保證生產者每條訊息只會被傳送一次,即at most once 語義 。

at least once 可以保證資料不丟失,但是不能保證資料不重複;相對的, at least once可以保證資料不重複,但是不能保證資料不丟失。

對於一些重要資訊,我們要求既不能重複也不能丟失,這時我們需要使用exactly once 語義 。0.11 版本的 kafka,引入了一項重大特性:冪等性。 所謂冪等性就是producer無論向broker傳送了多少次重複資料,broker都只會持久化一條。冪等性結合at least once語義,就結合成了kafka的exactly once語義。

at least once + 冪等性 = exactly once

啟動冪等性,只需要將producer的引數enable.idompotence 設定為true,ack設定為-1即可。

開啟冪等性的producer在初始化的時候會被分配乙個pid,發往同乙個分割槽的訊息會附帶sequence number(自動增長)。broker端會對做快取,當具有相同主鍵的訊息提交的時候,broker只會持久化一條訊息。

msg1

msg2

msg2

但是,pid重啟就會變化,同時不同分割槽也會有不同主鍵,所以冪等性無法保證跨分割槽跨會話。這裡我們就需要引進kafka事務。

事務kafka 從 0.11 版本開始引入了事務支援。事務可以保證 kafka 在 exactly once 語義的基礎上,生產和消費可以跨分割槽和會話,要麼全部成功,要麼全部失敗 。為了實現跨分割槽跨會話事務,引入乙個全域性唯一的transaction id ,將pproducer的pid和transaction id進行繫結,這樣,當producer重啟後,就可以通過transaction id 獲得原來的 pid。這個引數通過客戶端程式來進行設定 。

我們使用kafka訊息事務的場景有以下兩種:

在一次業務中,存在消費訊息,又存在生產訊息。此時如果訊息生產失敗,那麼消費者需要回滾。這種情況稱為consumer-transform-producer

在一次業務中,存在多次生產訊息,其中後續生產的訊息丟擲異常,前置生產的訊息需要回滾。

事務要求生產者開啟冪等性特性,因此通過將transactional.id引數設定為非空從而開啟事務特性的同時

需要將producerconfig.enable_idempotence_config設定為true(預設值為true),如果顯示設

置為false,則會丟擲異常。

以上是保證producer傳送資料可靠性保證的相關引數,結合spring-kafka的具體使用如下。

spring-kafkaproducer.xml配置:

<?xml version="1.0" encoding="utf-8"?>

部分重要引數詳解:

acks:

​ 這個引數用來指定分割槽中必須有多少個副本收到這條訊息,之後生產者才會認為這條訊息時寫入成功

的。retries :

​ 生產者從伺服器收到的錯誤有可能是臨時性的錯誤(比如分割槽找不到首領)。在這種情況下,如果達到

了 retires 設定的次數,生產者會放棄重試並返回錯誤。預設情況下,生產者會在每次重試之間等待

100ms,可以通過 retry.backoff.ms 引數來修改這個時間間隔。

batch.size :

​ 當有多個訊息要被傳送到同乙個分割槽時,生產者會把它們放在同乙個批次裡。該引數指定了乙個批次可

以使用的記憶體大小,按照位元組數計算,而不是訊息個數。當批次被填滿,批次裡的所有訊息會被傳送出

去。不過生產者並不一定都會等到批次被填滿才傳送,半滿的批次,甚至只包含乙個訊息的批次也可能

被傳送。所以就算把 batch.size 設定的很大,也不會造成延遲,只會占用更多的記憶體而已,如果設定

的太小,生產者會因為頻繁傳送訊息而增加一些額外的開銷。

max.request.size :

​ 該引數用於控制生產者傳送的請求大小,它可以指定能傳送的單個訊息的最大值,也可以指單個請求裡

所有訊息的總大小。 broker 對可接收的訊息最大值也有自己的限制( message.max.size ),所以兩

邊的配置最好匹配,避免生產者傳送的訊息被 broker 拒絕。

linger.ms:批處理延遲時間上限

buffer.memory:批處理緩衝區

enable.idempotence:是否開啟冪等性

producerlistener類

訊息傳送後的**方法,注意的是,這裡的監聽回顯的資料時要傳送的資料,不是返回的資料,可以通過日誌來觀察傳送資料是否正確。

public class kafkasendresulthandler implements producerlistener 

public void onerror(string topic, integer partition, object key, object value, exception e)

public boolean isinterestedinsuccess()

}

producerclient類

對kafkatemplate的再一次封裝,kafka在訊息傳送的時候傳送方式可以分為同步傳送和非同步傳送。

同步傳送:

​ 同步傳送的意思就是,一條訊息傳送之後,會阻塞當前執行緒, 直至返回 ack。

//同步傳送

public void syncsend()

非同步傳送:

//非同步傳送

public void asyncsend()

@override

public void onfailure(throwable ex)

});}

producerclient對kafkatemplate的封裝(不帶事務)

這裡只封裝了最簡單的傳送方法,同時可對其他傳送方法進行封裝,只需要修改傳參即可。

public class producerclient  else 

}else

} catch (interruptedexception e) catch (executionexception e)

system.out.println("kafkaservers response : "+m);

}}

public class kafkamesconstant
測試一下
public class excuter 

}

控制台結果:(我這裡沒有使用日誌輸出,在實際開發中需要使用日誌開發)

producerlistener started

kafka message send successful : ---topic:topic2---partition:null---key:null---value:2019-11-19 02:57:07---recordmetadata:topic2-2@4928

kafkaservers response :

Kafka之生產者

1 方便在集群中擴充套件,乙個topic可以有多個partition組成,而每個partition可以通過調整以適應它所在的機器 2 可以提高併發,因為可以以partition為單位讀寫 我們需要將生產者傳送的資料封裝成乙個producerrecord物件。1 指明partition的情況下,直接將...

kafka 生產者(二)

想要提高生產者的吞吐量可以通過調整一下4個引數來實現 batch.size 批次大小,預設16k linger.ms 等待時間,修改為5 100ms recordaccumulator 緩衝區大小,修改為64m 實現 public class customproducerparameters 關閉資...

kafka生產者分割槽策略

kafka生產者 分割槽策略 分割槽的原因 1 方便在集群中擴充套件,每個partition可以通過調整以適應它所在的機器,而乙個topic又 可以有多個partition組成,因此整個集群就可以適應任意大小的資料了 2 可以提高併發,因為可以以partition為單位讀寫了。分割槽的原則 1 指明...