上週分享了如何使用go來對kafka進行生產和消費,這周接著對kafka訊息佇列的一些特性來進行使用。
上次講到kafka有個consumer group的概念,而我們使用的sarama並沒有支援,所以這次引入sarama-cluster專案 : go get 「github.com/bsm/sarama-cluster」
//第二個引數是groupid
consumer, err := kafkacluster.newconsumer(brokers, "consumer-group1", topics, config)
if err != nil
defer consumer.close()
signals := make(chan os.signal, 1)
signal.notify(signals, os.interrupt)
// 接收錯誤
go func()
}()// 列印一些rebalance的資訊
go func()
}()// 消費訊息
for
case <-signals:
return}}
}如果需要多個consumer去消費同乙個topic,可以多次呼叫kafkacluster.newconsumer(brokers, "consumer-group1", topics, config)
來新建consumer,注意第二個引數是groupid,同乙個group的consumer,它們的groupid必須相同。如果新建了另乙個group,則可以重複消費topic的資料,是不是有點發布訂閱的味道。
同乙個group的consumer,kafka會均衡的為每個consumer分配partition,並遵循策略從partition中彈出訊息給之前分配好的consumer,每個partition每次只能有乙個consumer進行消費,所以consumer的數量應該小於partition數量。
func getkafkapartitionoffset()
//topics := string
client, err := sarama.newclient(brokers, config2)
if err != nil
defer client.close()
//有個要注意的地方,如果想獲取某個partition的offset位置,需要這個offsetmanager的groupid和consumer的一致,否則拿到的offset是不正確的。
offsetmanager,err:=sarama.newoffsetmanagerfromclient("consumer-group1",client)
if err != nil
defer offsetmanager.close()
partitionoffsetmanager,err:=offsetmanager.managepartition("0612_test",14)
if err != nil
defer partitionoffsetmanager.close()
nextoffset,_:=partitionoffsetmanager.nextoffset()
fmt.println("nextoffset:",nextoffset)
}
這個是獲取某個consumer group對應的某個分割槽的offset位置 Golang之傳送訊息至kafka
3 重新命名conf zoo sample.cfg 為conf zoo.cfg 4 編輯 conf zoo.cfg,修改datadir d zookeeper 3.3.6 data 4 執行bin zkserver.cmd 啟動結果如下 2 開啟config目錄下的server.properties...
golang實現kafka的訊息推送
kafka中涉及的名詞 訊息記錄 由乙個key,乙個value和乙個時間戳構成,訊息最終儲存在主題下的分割槽中,記錄在生產中稱為生產者記錄,在消費者中稱為消費記錄。kafka集群保持了所有發布的訊息,直到它們過期,無論訊息是否被消費了,在乙個可配置的時間段內,kafka集群保留了所有發布的訊息。比如...
關於golang接入Kafka訊息佇列的記錄
version 2 services zookeeper image wurstmeister zookeeper ports 2181 2181 kafka build image wurstmeister kafka volumes var run docker.sock var run doc...