kafka 非同步傳送阻塞 kafka配置檔案

2021-10-18 13:46:49 字數 4180 閱讀 7053

在kafka/config/目錄下面有3個配置檔案: 

producer.properties:生產端的配置檔案 

consumer.properties:消費端的配置檔案 

#消費者集群通過連線zookeeper來找到broker。

#zookeeper連線伺服器位址

zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

#zookeeper的session過期時間,預設5000ms,用於檢測消費者是否掛掉

zookeeper.session.timeout.ms=5000

#當消費者掛掉,其他消費者要等該指定時間才能檢查到並且觸發重新負載均衡

zookeeper.connection.timeout.ms=10000

#這是乙個時間閾值。

#指定多久消費者更新offset到zookeeper中。

#注意offset更新時基於time而不是每次獲得的訊息。

#一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的訊息

zookeeper.sync.time.ms=2000

#指定消費

group.id=***xx

#這是乙個數量閾值,經測試是500條。

#當consumer消費一定量的訊息之後,將會自動向zookeeper提交offset資訊#注意offset資訊並不是每消費一次訊息就向zk提交

#一次,而是現在本地儲存(記憶體),並定期提交,預設為true

auto.commit.enable=true

# 自動更新時間。預設60 * 1000

auto.commit.interval.ms=1000

# 當前consumer的標識,可以設定,也可以有系統生成,

#主要用來跟蹤訊息消費情況,便於觀察

conusmer.id=***

# 消費者客戶端編號,用於區分不同客戶端,預設客戶端程式自動產生

client.id=***x

# 最大取多少塊快取到消費者(預設10)

queued.max.message.chunks=50

# 當有新的consumer加入到group時,將會reblance,此後將會

#有partitions的消費端遷移到新 的consumer上,如果乙個

#consumer獲得了某個partition的消費許可權,那麼它將會向zk

#註冊 "partition owner registry"節點資訊,但是有可能

#此時舊的consumer尚沒有釋放此節點, 此值用於控制,

#註冊節點的重試次數.

rebalance.max.retries=5

#每拉取一批訊息的最大位元組數

#獲取訊息的最大尺寸,broker不會像consumer輸出大於

#此值的訊息chunk 每次feth將得到多條訊息,此值為總大小,

#提公升此值,將會消耗更多的consumer端記憶體

fetch.min.bytes=6553600

#當訊息的尺寸不足時,server阻塞的時間,如果超時,

#訊息將立即傳送給consumer

#資料一批一批到達,如果每一批是10條訊息,如果某一批還

#不到10條,但是超時了,也會立即傳送給consumer。

fetch.wait.max.ms=5000

socket.receive.buffer.bytes=655360

# 如果zookeeper沒有offset值或offset值超出範圍。

#那麼就給個初始的offset。有smallest、largest、

#anything可選,分別表示給當前最小的offset、

#當前最大的offset、拋異常。預設largest

auto.offset.reset=smallest

# 指定序列化處理類

derializer.class=kafka.serializer.defaultdecoder

server.properties:服務端的配置檔案 

#broker的全域性唯一編號,不能重複

broker.id=0

#用來監聽鏈結的埠,producer或consumer將在此埠建立連線

port=9092

#處理網路請求的執行緒數量,也就是接收訊息的執行緒數。

#接收執行緒會將接收到的訊息放到記憶體中,然後再從記憶體中寫入磁碟。

num.network.threads=3

#訊息從記憶體中寫入磁碟是時候使用的執行緒數量。

#用來處理磁碟io的執行緒數量

num.io.threads=8

#傳送套接字的緩衝區大小

socket.send.buffer.bytes=102400

#接受套接字的緩衝區大小

socket.receive.buffer.bytes=102400

#請求套接字的緩衝區大小

socket.request.max.bytes=104857600

#kafka執行日誌存放的路徑

log.dirs=/export/servers/logs/kafka

#topic在當前broker上的分片個數

num.partitions=2

#我們知道segment檔案缺省會被保留7天的時間,超時的話就

#會被清理,那麼清理這件事情就需要有一些執行緒來做。這裡就是

#用來設定恢復和清理data下資料的執行緒數量

num.recovery.threads.per.data.dir=1

#segment檔案保留的最長時間,預設保留7天(168小時),

#超時將被刪除,也就是說7天之前的資料將被清理掉。

log.retention.hours=168

#滾動生成新的segment檔案的最大時間

log.roll.hours=168

#日誌檔案中每個segment的大小,預設為1g

log.segment.bytes=1073741824

#上面的引數設定了每乙個segment檔案的大小是1g,那麼

#就需要有乙個東西去定期檢查segment檔案有沒有達到1g,

#多長時間去檢查一次,就需要設定乙個週期性檢查檔案大小

#的時間(單位是毫秒)。

log.retention.check.interval.ms=300000

#日誌清理是否開啟

log.cleaner.enable=true

#broker需要使用zookeeper儲存meta資料

zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

#zookeeper鏈結超時時間

zookeeper.connection.timeout.ms=6000

#上面我們說過接收執行緒會將接收到的訊息放到記憶體中,然後再從記憶體

#寫到磁碟上,那麼什麼時候將訊息從記憶體中寫入磁碟,就有乙個

#時間限制(時間閾值)和乙個數量限制(數量閾值),這裡設定的是

#數量閾值,下乙個引數設定的則是時間閾值。

#partion buffer中,訊息的條數達到閾值,將觸發flush到磁碟。

log.flush.interval.messages=10000

#訊息buffer的時間,達到閾值,將觸發將訊息從記憶體flush到磁碟,

#單位是毫秒。

log.flush.interval.ms=3000

#刪除topic需要server.properties中設定delete.topic.enable=true否則只是標記刪除

delete.topic.enable=true

#此處的host.name為本機ip(重要),如果不改,則客戶端會丟擲:

#producer connection to localhost:9092 unsuccessful 錯誤!

host.name=kafka01

advertised.host.name=192.168.239.128

阻塞,非阻塞,非同步,同步

之前一直對這個概念理不太清楚,今天看到一篇文章感覺不錯 本文 老張愛喝茶,廢話不說,煮開水。出場人物 老張,水壺兩把 普通 水壺,簡稱水壺 會響的水壺,簡稱響水壺 1 老張把水壺放到火上,立等水開。同步阻塞 老張覺得自己有點傻 2 老張把水壺放到火上,去客廳看電視,時不時去廚房看看水開沒有。同步非阻...

同步 非同步 阻塞 非阻塞

故事 老王燒開水。出場人物 老張,水壺兩把 普通水壺,簡稱水壺 會響的水壺,簡稱響水壺 老王想了想,有好幾種等待方式 1.老王用水壺煮水,並且站在那裡,不管水開沒開,每隔一定時間看看水開了沒。同步阻塞 老王想了想,這種方法不夠聰明。2.老王還是用水壺煮水,不再傻傻的站在那裡看水開,跑去寢室上網,但是...

同步 非同步 阻塞 非阻塞

故事 老王燒開水。出場人物 老張,水壺兩把 普通水壺,簡稱水壺 會響的水壺,簡稱響水壺 老王想了想,有好幾種等待方式 1.老王用水壺煮水,並且站在那裡,不管水開沒開,每隔一定時間看看水開了沒。同步阻塞 老王想了想,這種方法不夠聰明。2.老王還是用水壺煮水,不再傻傻的站在那裡看水開,跑去寢室上網,但是...