Kafka相關知識

2021-08-22 07:11:01 字數 2930 閱讀 3700

kafka是乙個高吞吐量的分布式的發布訂閱系統。

一、生產者大概執行流程

1.一條訊息過來首先會被封裝成乙個producerrecord物件

2.接下來要對這個物件進行序列化,因為kafka的訊息要從客戶端傳到伺服器端,涉及到網路傳輸,所以需要實現序列化。kafka提供了預設的序列化方式,也支援自定義序列化。

3.訊息序列化完了之後,對訊息要進行分割槽,分割槽的時候需要獲取集群的元資料。分割槽的這個過程很關鍵,因為這個時候就決定了我們這條訊息會被傳送到kafka服務端的哪個主題哪個分割槽了。

4.分好區的訊息不是直接被傳送到服務端,而是放入了生產者的乙個快取裡。在這個快取裡,多條訊息會被封裝成乙個批次,預設的乙個批次大小為16k。

5.sender執行緒啟動後會從快取裡去獲取可以傳送的批次。

6.sender執行緒把乙個乙個批次傳送到服務端。

1)producer:訊息生產者,發布訊息到kafka集群的終端或服務

2)broker:kafaka集群中包含的伺服器

3)topic:每條發布到kafka集群的訊息屬於的類別,即kafka是面向topic的

4)partition:每個topic包含乙個或者多個partition,kafka的分配單位是partition

5)comsumer:從kafka集群中消費訊息的終端或服務

6)comsumer group:每個consumer都屬於乙個consumer group,每條訊息只能被consumer group中的乙個comsumer group消費

7)replica:partition的副本,保障partition的高可用

8)leader:replica中的乙個角色,producer和consumer只跟leader互動

9)follower:replica中的乙個角色,從leader中複製資料

10)controller:kafka集群中的乙個伺服器,用來進行leader election以及各種failover

11)zookeeper:kafka通過zookeeper來儲存集群的meta資訊

對於傳統的message queue而言,一般會刪除已經被消費的訊息,而kafka集群會保留所有的訊息,無論其被消費與否。當然,因為磁碟的限制,不可能永久保留所有的資料,因此kafka提供兩種策略去刪除舊資料。一是基於時間,二是基於partition檔案大小。

kafka會為每乙個consumer group保留一些metadata資訊——當前消費的訊息的position,即offset。這個offect由consumer控制。正常情況下consumer會在消費完一條訊息後線性增加這個offset。當然,consumer也可將offset設成乙個較小的值,重新消費一些訊息。因為offset是無狀態的,它不需要標記哪些訊息被消費過,不需要通過broker去保證同乙個consumer group只有乙個consumer能消費某一條訊息,因此也就不需要鎖機制,這也為kafka的高吞吐率提供了有力保障。

c++使用librdkafka庫實現kafka的消費例項

1) 建立kafka配置

rdkafka::conf *conf = nullptr;

conf = rdkafka::conf::create(rdkafka::conf::conf_global);

2) 設定kafka各項引數

conf->set(「bootstrap.servers」, brokers_, errstr);  //設定broker list

conf->set(「group.id」, groupid_, errstr);  //設定consumer group

conf->set(「max.partition.fetch.bytes」, strfetch_num, errstr);  //每次從單個分割槽中拉取訊息的最大尺寸

3) 建立kafka topic配置

rdkafka::conf *tconf = nullptr;

tconf = rdkafka::conf::create(rdkafka::conf::conf_topic);

4) 設定kafka topic引數

if(tconf->set(「auto.offset.reset」, 「smallest」, errstr))

5) 建立kafka consumer例項

kafka_consumer_=rdkafka::consumer::create(conf, errstr);

6) 建立kafka topic

rdkafka::topic::create(kafka_consumer_, topics_, tconf, errstr);

7) 啟動kafka consumer例項

rdkafka::errorcode resp = kafka_consumer_->start(topic_, partition_, offset_);

8) 消費kafka

kafka_consumer_->consume(topic_, partition_, timeout_ms);

9) 阻塞等待訊息

kafka_consumer_->poll(0);

10)停止消費

kafka_consumer_->stop(topic_, partition_);

11)銷毀consumer例項

rdkafka::wait_destroyed(5000);

乙個典型的kafka集群中包含若干個producer(可以是web前端產生的page view,或者是伺服器日誌,系統cpu、memory等),若干broker(kafka支援水平擴充套件,一般broker數量越多,集群吞吐率越高),若干consumer group,以及乙個zookeeper集群。kafka通過zookeeper管理集群配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將訊息發布到broker,consumer使用pull模式從broker訂閱並消費訊息。

kafka相關知識點總結

1 kafka是什麼 類jms訊息佇列,結合jms中的兩種模式 點對點模型,發布者 訂閱者模型 可以有多個消費者主動拉取資料,在jms中只有點對點模式才有消費者主動拉取資料。kafka是乙個生產 消費模型。producer 生產者,只負責資料生產,生產者的 可以整合到任務系統中。資料的分發策略由pr...

kafka相關命令

1.群起指令碼 bin bash case 1 in start stop esac 2.檢視主題 bin kafka topics.sh list zookeeper k8smaster 2181 3.建立主題 kafka topics.sh create zookeeper k8smaster ...

kafka知識總結

1 kafka是什麼 類jms訊息佇列,結合jms中的兩種模式,可以有多個消費者主動拉取資料,在jms中只有點對點模式才有消費者主動拉取資料。kafka是乙個生產 消費模型。producer 生產者,只負責資料生產,生產者的 可以整合到任務系統中。資料的分發策略由producer決定,預設是defa...