010 消費者消費訊息API

2022-09-14 05:27:14 字數 718 閱讀 1283

一 ,概述

在前面講述api的時候,沒有說明消費者的api,本次在這裡需要重點的進行說明, 

二 .建立消費者

我們使用現在推薦使用的方式來建立乙個消費者.

下面展示建立乙個消費者具體的**:

現在推薦使用的就是建立乙個defaultconsumer的子類,重寫其中對應的方法,這是一種面向事件的程式設計模型.

consumer consumer = new

defaultconsumer(channel)

};

我們使用上面的方式建立了乙個消費者,同時我們重寫了乙個接受訊息的方法.

在上面的圖中,我們看到了很多的**,我們選擇合適的方法進行**處理.

三.消費訊息

在消費訊息的時候,我們會使用basicconsumer方法進行.

string basicconsume(string queue, boolean autoack, consumer callback) throws ioexception;
這個方法是我們最常用的消費者消費訊息的方法了.

我們只需要注意乙個引數就好了,是否自動完成ack.

這裡需要介紹一下ack的概念,由於訊息中介軟體需要保證訊息的不丟失,只有乙個消費得到了對應的ack之後,才會在訊息中介軟體之中刪除.

如果這裡設定為true,消費者在獲取到訊息之後就會自動的傳送乙個ack.

一般情況下,我們都會手動的返回ack.

kafka消費者低階API

實現使用低階api讀取指定topic,指定partition,指定offset的資料。1 消費者使用低階api 的主要步驟 步驟主要工作 1根據指定的分割槽從主題元資料中找到主副本 2獲取分割槽最新的消費進度 3從主副本拉取分割槽的訊息 4識別主副本的變化,重試 指定分割槽,指定offset 1 根...

生產消費者

producer consumer model include include define buffer size 100 緩衝區數量 define max seq 200 define n consumer 10 消費者數量 define n producer 3 生產者數量 define t ...

kafka消費者無法消費異常

今天被乙個kafka消費異常折磨了一天,頭差點炸了,還好最後解決了它 異常 伺服器 record is corrupt 記錄損壞 不明原因 有可能磁碟空間不足導致 導致消費者無法正常消費訊息 卡在某乙個offset 不能繼續消費 解決辦法 先停掉消費者程式 殺掉程序 不可關閉kafka服務 然後手動...