rdkafka 儲存offset到本地檔案

2021-09-01 05:39:29 字數 1304 閱讀 2664

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!

為了支援斷點續傳功能,需要將offset儲存在乙個地方,下次從這個offset開始。librdkafka提供了本地檔案儲存的方式。

下面的**演示了

1. 要用topic config物件設定 offset.store.path和offset.store.method

2. start函式接受引數offset_stored

std::unique_ptr

consumer(rdkafka::consumer::create(global_conf_.get(), err_));  if (!consumer)   stringstream stream;  stream << topic_name_ << "-"

<< partition_idx << ".txt";  string file_path = stream.str();  string lasterr;  topic_conf_->set("offset.store.path", file_path, lasterr);  topic_conf_->set("offset.store.method", "file", lasterr);  std::unique_ptr

topic(rdkafka::topic::create(consumer.get(), topic_name_, topic_conf_.get(), err_));  //  rdkafka::errorcode resp = consumer->start(topic.get(), partition_idx, rdkafka::topic::offset_beginning);  rdkafka::errorcode resp = consumer->start(topic.get(), partition_idx, rdkafka::topic::offset_stored);  if (resp != rdkafka::err_no_error)

注意:

1. 當前程序目錄下會出現$topic-$partiton-idx.txt檔案

2. 執行後正常讀取資料後需要過一會兒才會寫入數值

這個寫入時間可以設定:offset.store.sync.interval.ms

3. 程式啟動時,可以先手動將offset寫入檔案,然後再啟動

詳細配置參考github

給我老師的人工智慧教程打call!

rdkafka執行緒過多 kafka 多執行緒消費

一 1 kafka的消費並行度依賴topic配置的分割槽數,如分割槽數為10,那麼最多10臺機器來並行消費 每台機器只能開啟乙個執行緒 或者一台機器消費 10個執行緒並行消費 即消費並行度和分割槽數一致。2 1 如果指定了某個分割槽,會只講訊息發到這個分割槽上 2 如果同時指定了某個分割槽和key,...

offset巨集的講解

define offsetof type,member size t type 0 member 對這個巨集的講解我們大致可以分為以下4步進行講解 1 type 0 0位址強制 轉換 為type 結構型別的指標 2 type 0 member 訪問type 結構中的 member 資料成員 3 ty...

offset用法(滑動載入)

第一 select from table limit 10,5 含義是跳過10條取出5條資料,limit後面是從第10條開始讀,讀取5條資訊,即讀取5條資料 第二 select from table limit 10 offset 5 含義是從第5條 不包括 資料開始取出10條資料,limit後面跟...