kafka直連方式消費多個topic

2021-09-07 20:30:20 字數 2516 閱讀 8949

乙個消費者組可以消費多個topic,以前寫過一篇乙個消費者消費乙個topic的,這次的是乙個消費者組通過直連方式消費多個topic,做了小測試,結果是正確的,通過檢視zookeeper的客戶端,zookeeper記錄了偏移量

package day04

/*消費多個topic

*/import kafka.common.topicandpartition

import kafka.message.messageandmetadata

import kafka.serializer.stringdecoder

import kafka.utils.

import scala.collection.mutable.listbuffer

import org.i0itec.zkclient.zkclient

import org.apache.spark.sparkconf

import org.apache.spark.streaming.dstream.inputdstream

import org.apache.spark.streaming.kafka.

import org.apache.spark.streaming.

//new listbuffer用來存放zkgrouptopicdirs, 用來儲存偏移量的位址

//因為有多個topic,對應的也就有多個zkgrouptopicdirs

var zkgtlist:listbuffer[zkgrouptopicdirs] =new listbuffer[zkgrouptopicdirs]()

//根據topiclist 新建 zkgrouptopicdirs 新增到zkgtlist

for(tp <- topicslist)

//新建zkclient,用來獲取偏移量和更新偏移量

val zkclient = new zkclient(zkquorum)

//新建乙個inputdstream,要是var,因為有兩種情況,消費過? 沒有消費過? 根據情況賦值

var kafkadstream :inputdstream[(string,string)] = null

//建立乙個map,(key,value)-》( 對應的時topic和分割槽 ,偏移量)

var fromoffset = map[topicandpartition,long]()

//獲取每個topic是否被消費過

var childrens:listbuffer[int] =new listbuffer[int]()

var flag = false //有topic被消費過則為true

for (topicdir <- zkgtlist)

}if(flag)

}//返回的而結果是 kafka的key,預設是null, value是kafka中的值

//建立kafkadstream

kafkadstream = kafkautils.createdirectstream[string,string,stringdecoder,stringdecoder,(string,string)](

ssc,kafkaparams,fromoffset,messagehandler

)}else

/*val children1 = zkclient.countchildren(zkgrouptopicdirs1.consumeroffsetdir)

val children2 = zkclient.countchildren(zkgrouptopicdirs2.consumeroffsetdir)

if(children1>0 || children2>0)

}if(children2>0)

}val messagehandler =(mmd:messageandmetadata[string,string])=>

kafkadstream = kafkautils.createdirectstream[string,string,stringdecoder,stringdecoder,(string,string)](ssc,

kafkaparams,fromoffset,messagehandler)

}else*/

var offsetranges = array[offsetrange]www.hjpt521.com() //用來記錄更新的每個topic的分割槽偏移量

kafkadstream.foreachrdd(kafkardd=>else if(topicnn.equals(topic2))*/}})

ssc.start()

ssc.awaittermination(www.dfgjyl.cn)

可以通過zookeeper的客戶端,在/consumers中檢視偏移量,

我的3個topic中,其中wc和wc1只有1個分割槽,可以通過下圖可看出wc1的0分割槽偏移量13

Kafka直連方式儲存MySQL

記得在之前寫了一篇是mysql基礎使用的,這次就用mysql來儲存direct方式的偏移量。如下 package kafka1 import kafka.common.topicandpartition import kafka.message.messageandmetadata import k...

kafka直連方式api與Redis

kafka作為生產者,把生產的資料儲存到redis中,讀取的是json檔案,需要匯入阿里的包 一 pom檔案進行設定 redis.clients jedis 2.9.0 com.typesafe config 1.3.1 org.scalikejdbc scalikejdbc 2.11 2.5.0 ...

Kafka直連儲存HBase

在之前介紹了kafka與sparkstreaming互動的兩種方式,我提到了公司採用的是direct方式,這次我向大家分享一下將偏移量儲存在hbase中。如下 package kafka1 import kafka.common.topicandpartition import kafka.mess...