Kafka 0 8公升級至0 10消費時做出的改動

2021-10-01 12:33:00 字數 1421 閱讀 4978

kafka 0.8版本公升級為0.10版本時,消費**需要做出一些修改,如下:

kafka 0.8版本:

val kafkaparams = map[string, string](

"metadata.broker.list" -> kafka_ip, //此處為kafka對應的ip

"refresh.leader.backoff.ms" -> "30000")

val lines = kafkautils

.createdirectstream[string, string, stringdecoder, stringdecoder]( //根據kafka資料中key value的型別進行選擇,如string

ssc,

kafkaparams,

topics)

val infos = lines.reducebykey((a: string, b: string) => yourfunc(a, b), 100) //此處可使用自己的方法對同一key下的多個value進行相關操作

kafka 0.10版本:

val kafkaparams = map[string, object](

"bootstrap.servers" -> kafka_ip, //此處需將0.8中的「metadata.broker.list」改為「bootstrap.servers」

"key.deserializer" -> classof[stringdeserializer], //需要在此處對kafka資料進行序列化

"value.deserializer" -> classof[stringdeserializer],

"group.id" -> "my_test", 版本的快取需要將topic的分割槽和groupid作為key,此處group.id可自己定義名字

"auto.offset.reset" -> "latest",

"refresh.leader.backoff.ms" -> "30000")

var lines = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topicset, kafkaparams));//createdirectstream的方式與0.8版不同,自行比較

val infos = lines.map(record => (record.key(),record.value()))// 注意:資料流中的每一項都是乙個consumerrecord類,本步操作後可執行reducebykey

val infos1 = infos.reducebykey((a: string, b: string) => yourfunc(a, b), 100)

zabbix自帶php5 4公升級至7 2公升級步驟

公升級前備份 etc php.ini 1.解除安裝原來低版本的php rpm qa grep php xargs i rpm e nodeps2.更新yum源 rpm uvh rpm uvh 生成一些repo檔案在 etc yum.repos.d 目錄下 ls etc yum.repos.d epe...

Fedora 15公升級核心至3 0 4

安裝的fedoar 15,核心版本2.9.40,不知什麼原因,無線網絡卡的驅動可以工作,但是無線上網效率不超過50k s,自己試著更換驅動,效果沒有改善。雖然有人說2.9.40核心和3.0核心沒有什麼實質性的差別,但是自己還是試著手動編譯了linux 3.0.4核心,並且安裝成功。安裝完成後,無線上...

Fedora 15公升級核心至3 0 4

安裝的fedoar 15,核心版本2.9.40,不知什麼原因,無線網絡卡的驅動可以工作,但是無線上網效率不超過50k s,自己試著更換驅動,效果沒有改善。雖然有人說2.9.40核心和3.0核心沒有什麼實質性的差別,但是自己還是試著手動編譯了linux 3.0.4核心,並且安裝成功。安裝完成後,無線上...