Kafka 0 9 0 x 控制消費資料數量

2021-07-27 18:59:10 字數 653 閱讀 3700

1、乙個consumer多個partition的情況

因kafka0.9.0.x 只能控制poll資料的時間,如果每次fetch的資料過多而consumer在session timeout的時間內沒處理過來的話,coordinator會認為該consumer已經掛掉了,然後進行rebalance,重新分配partition

這裡只貼一段**:

public

void

init() else

} catch (exception e)

currentoffset = record.offset();

maxpoll++;

if(maxpoll > 20)

}//指定消費到的位置

long lastoffset = currentoffset + 1;

//指定下次poll的位置

consumer.seek(tp, lastoffset);

consumer.commitsync(collections.singletonmap(tp, new offsetandmetadata(lastoffset)));}}

}}).start();

} catch (exception e)

}

storm實時消費kafka資料

原創 2017年06月05日 16 30 15 程式的pom.xml檔案 org.apache.stormgroupid storm coreartifactid 1.0.2version providedscope dependency org.apache.stormgroupid storm ...

Python指令碼消費kafka資料

一 簡介 詳見 二 安裝 詳見部落格 三 按照官網的樣例,先跑乙個應用 1 生產者 from kafka import kafkaproducer producer kafkaproducer bootstrap servers 172.21.10.136 9092 此處ip可以是多個 0.0.0....

Python指令碼消費kafka資料

一 簡介 詳見 二 安裝 詳見部落格 三 按照官網的樣例,先跑乙個應用 1 生產者 from kafka import kafkaproducer producer kafkaproducer bootstrap servers 172.21.10.136 9092 此處ip可以是多個 0.0.0....