kafka 集群管理 建立乙個topic流程

2021-10-22 22:05:18 字數 1103 閱讀 3175

topiccommand.createtopic

def createtopic

(zkutils: zkutils, opts: topiccommandoptions)

else

{//往目錄裡面寫topic的分割槽的分配方案

->

writetopicpartitionassignment

(zkutils, topic, partitionreplicaassignment, update)

監聽變化

kafkacontroller.oncontrolle***ilover

// 監聽分割槽的變化

partitionstatemachine.

registerlisteners()

-> registertopicchangelistener

// /brokers/topics

zkutils.zkclient.

subscribechildchanges

(brokertopicspath, topicchangelistener)

-> topicchangelistener =

newtopicchangelistener()

//傳送元資料變更的請求

controller.onnewtopiccreation

onnewpartitioncreation

replicastatemachine.handlestatechanges

//傳送請求給其他所有的broker

brokerrequestbatch.

sendrequeststobrokers

(controller.epoch)

//todo 傳送請求 apikeys.update_metadata_key

controller.

sendrequest

(broker, apikeys.update_metadata_key,

some

(version)

, updatemetadatarequest, null)

至此,與上篇一致,都會更新元資料

如何在Kafka上建立乙個Topic

bin kafka topics.sh zookeeper 192.168.2.225 2183 config mobile mq create topic test.example replication factor 2 partitions 24 topic指定topic name parti...

如何選擇乙個Kafka集群中的主題分割槽的數量

kafka集群中分割槽應該設定多少比較合適,這是乙個面對眾多開發者共同的難題,這篇文章的目標就是來解釋一些重要的因素,同時會提供一些簡單的公式。首先我們要有個認知,那就是分割槽 partition 是kafka中的併發單位。從生產者和broker層面來說,寫入訊息到不同的分割槽是一種完全的並發行為,...

建立乙個struct,來管理乙個動態增長的陣列

c 程式設計思想,在介紹資料封裝給了乙個cstash的例子,大概的思想是,建立乙個struct,來管理乙個動態增長的陣列。這個陣列可以接受任何型別的基本資料型別。包括示例中的int和char,乙個能儲存多種資料型別的底層資料型別,當然是最小的型別也就是sizeof運算子返回為1的資料型別,綜合考慮,...