kafka消費者低階API

2021-09-30 19:41:05 字數 3320 閱讀 6985

實現使用低階api讀取指定topic,指定partition,指定offset的資料。

1)消費者使用低階api 的主要步驟:

步驟主要工作

1根據指定的分割槽從主題元資料中找到主副本

2獲取分割槽最新的消費進度

3從主副本拉取分割槽的訊息

4識別主副本的變化,重試

指定分割槽,指定offset

(1)根據partition找到leader

(2)找到offset

(3)找到leader中的副本,進行儲存,方便識別主副本的變化

2)方法描述:

findleader()

客戶端向種子節點傳送主題元資料,將副本集加入備用節點

getlastoffset()

消費者客戶端傳送偏移量請求,獲取分割槽最近的偏移量

run()

消費者低階ap i拉取訊息的主要方法

findnewleader()

當分割槽的主副本節點發生故障,客戶將要找出新的主副本

public class ******example

public static void main(string args) catch (exception e)

}public void run(long a_maxreads, string a_topic, int a_partition, lista_seedbrokers, int a_port) throws exception

if (metadata.leader() == null)

string leadbroker = metadata.leader().host();

string clientname = "client_" + a_topic + "_" + a_partition;

//獲取資料的消費者物件

******consumer consumer = new ******consumer(leadbroker, a_port, 100000, 64 * 1024, clientname);

long readoffset = getlastoffset(consumer, a_topic, a_partition, kafka.api.offsetrequest.earliesttime(), clientname);

int numerrors = 0;

while (a_maxreads > 0)

//可以有多個addfetch方法,也就是這裡可以傳入多個topic和partition

fetchrequest req = new fetchrequestbuilder().clientid(clientname).addfetch(a_topic, a_partition, readoffset, 100000).build();

fetchresponse fetchresponse = consumer.fetch(req);

if (fetchresponse.haserror())

consumer.close();

consumer = null;

leadbroker = findnewleader(leadbroker, a_topic, a_partition, a_port);

continue;

}numerrors = 0;

long numread = 0;

for (messageandoffset messageandoffset :fetchresponse.messageset(a_topic, a_partition))

readoffset = messageandoffset.nextoffset();

bytebuffer payload = messageandoffset.message().payload();

byte bytes = new byte[payload.limit()];

payload.get(bytes);

system.out.println(string.valueof(messageandoffset.offset()) + ": " + new string(bytes, "utf-8"));

numread++;

a_maxreads--;

}if (numread == 0) catch (interruptedexception ie) }}

if (consumer != null)

consumer.close();

}public static long getlastoffset(******consumer consumer, string topic, int partition, long whichtime, string clientname)

long offsets = response.offsets(topic, partition);

return offsets[0];

}//當主leader掛了後,只需要從follower獲取變成leader的主機

private string findnewleader(string a_oldleader, string a_topic, int a_partition, int a_port)throws exception else if (metadata.leader() == null) else if (a_oldleader.equalsignorecase(metadata.leader().host()) && i == 0) else

if (gotosleep)

}system.out.println("unable to find new leader after broker failure. exiting");

throw new exception("unable to find new leader after broker failure. exiting");

}private partitionmetadatafindleader(lista_seedbrokers, int a_port, string a_topic, int a_partition) }}

} catch (exception e) finally

}if (returnmetadata != null)

}return returnmetadata;}}

kafka消費者無法消費異常

今天被乙個kafka消費異常折磨了一天,頭差點炸了,還好最後解決了它 異常 伺服器 record is corrupt 記錄損壞 不明原因 有可能磁碟空間不足導致 導致消費者無法正常消費訊息 卡在某乙個offset 不能繼續消費 解決辦法 先停掉消費者程式 殺掉程序 不可關閉kafka服務 然後手動...

kafka 主動消費 Kafka消費者的使用和原理

publicstaticvoidmain string args finally 前兩步和生產者類似,配置引數然後根據引數建立例項,區別在於消費者使用的是反序列化器,以及多了乙個必填引數 group.id,用於指定消費者所屬的消費組。關於消費組的概念在 kafka中的基本概念 中介紹過了,消費組使得...

010 消費者消費訊息API

一 概述 在前面講述api的時候,沒有說明消費者的api,本次在這裡需要重點的進行說明,二 建立消費者 我們使用現在推薦使用的方式來建立乙個消費者.下面展示建立乙個消費者具體的 現在推薦使用的就是建立乙個defaultconsumer的子類,重寫其中對應的方法,這是一種面向事件的程式設計模型.con...