golang結合Kafka訊息佇列實踐 二

2021-08-20 18:52:14 字數 2174 閱讀 5940

上週分享了如何使用go來對kafka進行生產和消費,這周接著對kafka訊息佇列的一些特性來進行使用。

上次講到kafka有個consumer group的概念,而我們使用的sarama並沒有支援,所以這次引入sarama-cluster專案 : go get 「github.com/bsm/sarama-cluster」

//第二個引數是groupid

consumer, err := kafkacluster.newconsumer(brokers, "consumer-group1", topics, config)

if err != nil

defer consumer.close()

signals := make(chan os.signal, 1)

signal.notify(signals, os.interrupt)

// 接收錯誤

go func()

}()// 列印一些rebalance的資訊

go func()

}()// 消費訊息

for

case <-signals:

return}}

}如果需要多個consumer去消費同乙個topic,可以多次呼叫kafkacluster.newconsumer(brokers, "consumer-group1", topics, config)來新建consumer,注意第二個引數是groupid,同乙個group的consumer,它們的groupid必須相同。如果新建了另乙個group,則可以重複消費topic的資料,是不是有點發布訂閱的味道。

同乙個group的consumer,kafka會均衡的為每個consumer分配partition,並遵循策略從partition中彈出訊息給之前分配好的consumer,每個partition每次只能有乙個consumer進行消費,所以consumer的數量應該小於partition數量。

func getkafkapartitionoffset()  

//topics := string

client, err := sarama.newclient(brokers, config2)

if err != nil

defer client.close()

//有個要注意的地方,如果想獲取某個partition的offset位置,需要這個offsetmanager的groupid和consumer的一致,否則拿到的offset是不正確的。

offsetmanager,err:=sarama.newoffsetmanagerfromclient("consumer-group1",client)

if err != nil

defer offsetmanager.close()

partitionoffsetmanager,err:=offsetmanager.managepartition("0612_test",14)

if err != nil

defer partitionoffsetmanager.close()

nextoffset,_:=partitionoffsetmanager.nextoffset()

fmt.println("nextoffset:",nextoffset)

}

這個是獲取某個consumer group對應的某個分割槽的offset位置

Golang之傳送訊息至kafka

3 重新命名conf zoo sample.cfg 為conf zoo.cfg 4 編輯 conf zoo.cfg,修改datadir d zookeeper 3.3.6 data 4 執行bin zkserver.cmd 啟動結果如下 2 開啟config目錄下的server.properties...

golang實現kafka的訊息推送

kafka中涉及的名詞 訊息記錄 由乙個key,乙個value和乙個時間戳構成,訊息最終儲存在主題下的分割槽中,記錄在生產中稱為生產者記錄,在消費者中稱為消費記錄。kafka集群保持了所有發布的訊息,直到它們過期,無論訊息是否被消費了,在乙個可配置的時間段內,kafka集群保留了所有發布的訊息。比如...

關於golang接入Kafka訊息佇列的記錄

version 2 services zookeeper image wurstmeister zookeeper ports 2181 2181 kafka build image wurstmeister kafka volumes var run docker.sock var run doc...