Spring Kafka中關於Kafka的配置引數

2022-09-11 00:09:37 字數 4721 閱讀 2929

1

#################consumer的配置引數(開始)#################

2 #如果'enable.auto.commit'為true,則消費者偏移自動提交給kafka的頻率(以毫秒為單位),預設值為5000。

3 spring.kafka.consumer.auto-commit-interval;45

#當kafka中沒有初始偏移量或者伺服器上不再存在當前偏移量時該怎麼辦,預設值為latest,表示自動將偏移重置為最新的偏移量

6#可選的值為latest, earliest, none

7 spring.kafka.consumer.auto-offset-reset=latest;89

#以逗號分隔的主機:埠對列表,用於建立與kafka群集的初始連線。

10 spring.kafka.consumer.bootstrap-servers;

1112

#id在發出請求時傳遞給伺服器;用於伺服器端日誌記錄。

13 spring.kafka.consumer.client-id;

1415

#如果為true,則消費者的偏移量將在後台定期提交,預設值為true

16 spring.kafka.consumer.enable-auto-commit=true;17

18#如果沒有足夠的資料立即滿足「fetch.min.bytes」給出的要求,伺服器在回答獲取請求之前將阻塞的最長時間(以毫秒為單位)

19#預設值為500

20 spring.kafka.consumer.fetch-max-wait;

2122

#伺服器應以位元組為單位返回獲取請求的最小資料量,預設值為1,對應的kafka的引數為fetch.min.bytes。

23 spring.kafka.consumer.fetch-min-size;

2425

#用於標識此使用者所屬的使用者組的唯一字串。

26 spring.kafka.consumer.group-id;

2728

#心跳與消費者協調員之間的預期時間(以毫秒為單位),預設值為3000

29 spring.kafka.consumer.heartbeat-interval;

3031

#金鑰的反序列化器類,實現類實現了介面org.apache.kafka.common.serialization.deserializer

32 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.stringdeserializer

3334

#值的反序列化器類,實現類實現了介面org.apache.kafka.common.serialization.deserializer

35 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.stringdeserializer

3637

#一次呼叫poll()操作時返回的最大記錄數,預設值為500

38 spring.kafka.consumer.max-poll-records;

39#################consumer的配置引數(結束)#################

40#################producer的配置引數(開始)#################

41#procedure要求leader在考慮完成請求之前收到的確認數,用於控制傳送記錄在服務端的持久化,其值可以為如下:

42 #acks = 0 如果設定為零,則生產者將不會等待來自伺服器的任何確認,該記錄將立即新增到套接字緩衝區並視為已傳送。在這種情況下,無法保證伺服器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設定為-1。

43 #acks = 1這意味著leader會將記錄寫入其本地日誌,但無需等待所有副本伺服器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄後立即失敗,但在將資料複製到所有的副本伺服器之前,則記錄將會丟失。

44 #acks = all 這意味著leader將等待完整的同步副本集以確認記錄,這保證了只要至少乙個同步副本伺服器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設定。

45 #可以設定的值為:all, -1, 0, 1

46 spring.kafka.producer.acks=1

4748

#每當多個記錄被傳送到同一分割槽時,生產者將嘗試將記錄一起批量處理為更少的請求, 

49#這有助於提公升客戶端和伺服器上的效能,此配置控制預設批量大小(以位元組為單位),預設值為16384

50 spring.kafka.producer.batch-size=16384

5152

#以逗號分隔的主機:埠對列表,用於建立與kafka群集的初始連線

53 spring.kafka.producer.bootstrap-servers

5455

#生產者可用於緩衝等待傳送到伺服器的記錄的記憶體總位元組數,預設值為33554432

56 spring.kafka.producer.buffer-memory=33554432

5758

#id在發出請求時傳遞給伺服器,用於伺服器端日誌記錄

59 spring.kafka.producer.client-id

6062 #它還接受'uncompressed'以及'producer',分別表示沒有壓縮以及保留生產者設定的原始壓縮編解碼器,

63#預設值為producer

64 spring.kafka.producer.compression-type=producer

6566

#key的serializer類,實現類實現了介面org.apache.kafka.common.serialization.serializer

67 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.stringserializer

6869

#值的serializer類,實現類實現了介面org.apache.kafka.common.serialization.serializer

70 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.stringserializer

7172

#如果該值大於零時,表示啟用重試失敗的傳送次數

73spring.kafka.producer.retries

74#################producer的配置引數(結束)#################

75#################listener的配置引數(結束)#################

76 #偵聽器的ackmode,參見https://

docs.spring.io/spring-kafka/reference/htmlsingle/#committing-offsets

77#當enable.auto.commit的值設定為false時,該值會生效;為true時不會生效

78 spring.kafka.listener.ack-mode;

7980

#在偵聽器容器中執行的執行緒數

81spring.kafka.listener.concurrency;

8283

#輪詢消費者時使用的超時(以毫秒為單位)

84 spring.kafka.listener.poll-timeout;

8586

#當ackmode為「count」或「count_time」時,偏移提交之間的記錄數

87 spring.kafka.listener.ack-count;

8889

#當ackmode為「time」或「count_time」時,偏移提交之間的時間(以毫秒為單位)

90 spring.kafka.listener.ack-time;

91 #################listener的配置引數(結束)#################

一、線上問題

出現kafka手動提交失敗,堆疊資訊如下:

通過堆疊資訊可以看出,有兩個重要引數: session.timeout  和 max.poll.records

session.timeout.ms : 在使用kafka的團隊管理設施時,用於檢測消費者失敗的超時時間。消費者定期傳送心跳來向經紀人表明其活躍度。如果**在該會話超時到期之前沒有收到心跳,那麼**將從該組中刪除該消費者並啟動重新平衡。

max.poll.records : 在一次呼叫poll()中返回的最大記錄數。

根據堆疊的提示,他讓增加 session.timeout.ms 時間 或者 減少 max.poll.records。

總結:1、 使用kafka時,消費者每次poll的資料業務處理時間不能超過kafka的max.poll.interval.ms,該引數在kafka0.10.2.1中的預設值是300s,所以要綜合業務處理時間和每次poll的資料數量。

2、j**a執行緒池大小的選擇,

對於cpu密集型應用,也就是計算密集型,執行緒池大小應該設定為cpu核數+1;

對於io密集型應用 ,執行緒池大小設定為    2*cpu核數+1.

** 

關於k 均值演算法

同時也參考了 k means演算法是最簡單的一種聚類演算法。演算法的目的是使各個樣本與所在類均值的誤差平方和達到最小 這也是評價k means演算法最後聚類效果的評價標準 k means聚類演算法的一般步驟 初始化。輸入基因表達矩陣作為物件集x,輸入指定聚類類數n,並在x中隨機選取n個物件作為初始聚...

關於k分位數 收藏

關於k分位數 收藏 題目 對乙個含有n個元素的集合來說,所謂的k分位數,就是能把已排序的集合分成k個大小相等的集合的 k 1個順序統計量 請給出乙個能求出含有n個元素 無序 的集合的k分位數的o nlgk 時間的演算法。分析 k select a n k if k 0 return m bit se...

關於k近鄰法的研究

在這裡我們將學習到k 近鄰法的基本原理,如何使用測量距離的方法來分類 我們會學習如何玻璃鋼python從文字檔案匯入並理解資料 再次,還要避免在存在許多資料倆元時,要避免計算計算距離時遇到的一些常見錯誤 利用k 近鄰演算法改進約會 和數字手寫識別系統。一 演算法概述 通過測量不同特徵值之間的距離的方...