go saram 消費者呼叫邏輯

2021-10-07 06:47:15 字數 1389 閱讀 1053

業務層:

newconsumer(client.mbrokers, group, topic, config)
cluster-comsuer.go

構造consumer:
consumer, err := sarama.newconsumerfromclient(client.client)
c.client.refreshcoordinator(groupid)//獲取組協調者
起迴圈

for {

c.nexttick

nexttick:

重平衡->訂閱主題->fetchoffsets->針對該主題分割槽下的offset起乙個分割槽消費者->然後呼叫分割槽消費者的consumepartition

comsumer.go

consumepartition(topic string, partition int32, offset int64)->
c.refbrokerconsumer(leader)->
c.newbrokerconsumer(broker)->
go withrecover(bc.subscriptionconsumer)->
bc.fetchnewmessages()->
bc.broker.fetch(request)
broker.go

b.sendandreceive(request, response)->
versioneddecode(buf, res, req.version())  and res is
respons, response is fetchresponse type
encode_decoder.go

in.decode(&helper, version)  fetchresponse type decode is:
fetch_repsonse.go

decode(pd packetdecoder, version int16)->
records.decode(recordsdecoder);
records.go

decode(pd packetdecoder)->
record_batch.go

decode(pd packetdecoder) include 解壓縮

生產消費者

producer consumer model include include define buffer size 100 緩衝區數量 define max seq 200 define n consumer 10 消費者數量 define n producer 3 生產者數量 define t ...

生產者消費者 生產者與消費者模式

一 什麼是生產者與消費者模式 其實生產者與消費者模式就是乙個多執行緒併發協作的模式,在這個模式中呢,一部分執行緒被用於去生產資料,另一部分執行緒去處理資料,於是便有了形象的生產者與消費者了。而為了更好的優化生產者與消費者的關係,便設立乙個緩衝區,也就相當於乙個資料倉儲,當生產者生產資料時鎖住倉庫,不...

生成者消費者問題

自己實踐的 生成者消費者問題 public class threaddemo 生產者 class producer implements runnable public void run 消費者 class consumer implements runnable public void run c...