Kafka 心跳機制 重複消費

2021-10-24 21:40:29 字數 2194 閱讀 8570

kafka是通過心跳機制來控制消費超時,心跳機制對於消費者客戶端來說是無感的,它是乙個非同步執行緒,當我們啟動乙個消費者例項時,心跳執行緒就開始工作了。心跳超時會導致訊息重複消費

在org.apache.kafka.clients.consumer.internals.abstractcoordinator中會啟動乙個heartbeatthread執行緒來定時傳送心跳和檢測消費者的狀態。每個消費者都有個org.apache.kafka.clients.consumer.internals.consumercoordinator,而每個consumercoordinator都會啟動乙個heartbeatthread執行緒來維護心跳,心跳資訊存放在org.apache.kafka.clients.consumer.internals.heartbeat中,宣告的schema如下所示:

private final int sessiontimeoutms;

private final int heartbeatintervalms;

private final int maxpollintervalms;

private final long retrybackoffms;

private volatile long lastheartbeatsend;

private long lastheartbeatreceive;

private long lastsessionreset;

private long lastpoll;

private boolean heartbeatfailed;

心跳執行緒實現方法

public void run() 

if (state != memberstate.stable)

client.pollnowakeup();

long now = time.milliseconds();

if (coordinatorunknown()) else if (heartbeat.sessiontimeoutexpired(now)) else if (heartbeat.polltimeoutexpired(now)) else if (!heartbeat.shouldheartbeat(now)) else

}@override

public void onfailure(runtimeexception e) else }}

});}}}

} catch (authenticationexception e) catch (groupauthorizationexception e) catch (interruptedexception | interruptexception e) catch (throwable e) finally

}

在心跳執行緒中這裡面包含兩個最重要的超時函式,分別是sessiontimeoutexpired() 和 polltimeoutexpired()。

public boolean sessiontimeoutexpired(long now) 

public boolean polltimeoutexpired(long now)

如果sessiontimeout超時,則會被標記為當前協調器處理斷開, 即將將消費者移除,重新分配分割槽和消費者的對應關係。在kafka broker server中,consumer group定義了5中(如果算上unknown,應該是6種狀態)狀態,org.apache.kafka.common.consumergroupstate,如下圖所示:

如果觸發了poll超時,此時消費者客戶端會退出consumergroup,當再次poll的時候,會重新加入到consumergroup,觸發消費者再平衡策略rebalancegroup。而kafkaconsumer client是不會幫我們重複poll的,需要我們自己在實現的消費邏輯中不停的呼叫poll方法

tcp心跳機制

對連線上來的連線,進行檢測,以防止客戶端異常關閉,或線路異常斷開,而伺服器不知道,得到乙個半連線這種情況。當然可以在協議裡加乙個心跳包,然後伺服器端定時檢測,過一段時間就去輪訓一次,看哪些連線超過多少時間沒有反應。超時就關閉。但這樣有點不爽,要自己寫程式碼來完成。還要鎖定連線列表,代價挺大的。記得以...

Eureka 心跳機制

server服務端 server port 8761 eureka client 例項是否在eureka伺服器上註冊自己的資訊以提供其他服務發現,預設為true register with eureka false 此客戶端是否獲取eureka伺服器登錄檔上的註冊資訊,預設為true fetch r...

tcp心跳機制

對連線上來的連線,進行檢測,以防止客戶端異常關閉,或線路異常斷開,而伺服器不知道,得到乙個半連線這種情況。當然可以在協議裡加乙個心跳包,然後伺服器端定時檢測,過一段時間就去輪訓一次,看哪些連線超過多少時間沒有反應。超時就關閉。但這樣有點不爽,要自己寫程式碼來完成。還要鎖定連線列表,代價挺大的。記得以...