Kafka直連儲存HBase

2021-09-01 06:31:23 字數 1755 閱讀 8575

在之前介紹了kafka與sparkstreaming互動的兩種方式,我提到了公司採用的是direct方式,這次我向大家分享一下將偏移量儲存在hbase中。

**如下:

package kafka1

import kafka.common.topicandpartition

import kafka.message.messageandmetadata

import kafka.serializer.stringdecoder

import kafka.utils.zkutils

import org.apache.hadoop.hbase.client.

import org.apache.hadoop.hbase.util.bytes

import org.apache.hadoop.hbase.

import org.apache.spark.sparkconf

import org.apache.spark.streaming.dstream.inputdstream

import org.apache.spark.streaming.kafka.

import org.apache.spark.streaming.

object kafkahbasemanager

table.put(put)

conn.close()

} // 從zookeeper中獲取topic的分割槽數

def getnumberofpartitionsfortopicfromzk(topic_name: string, group_id: string,

zkquorum: string, zkrootdir: string, sesstimeout: int, conntimeout: int): int =

// 獲取hbase的offset

def getlastestoffsets(topic_name: string, group_id: string, htablename: string,

zkquorum: string, zkrootdir: string, sesstimeout: int, conntimeout: int): map[topicandpartition, long] =

val fromoffsets = collection.mutable.map[topicandpartition, long]()

if (hbasenumberofpartitions == 0)

} else if (zknumberofpartitions > hbasenumberofpartitions)

// 對新增加的分割槽將它的offset值設為0

for (partition <- hbasenumberofpartitions until zknumberofpartitions)

} else

}scanner.close()

conn.close()

fromoffsets.tomap

} def main(args: array[string]): unit =

})ssc.start()

ssc.awaittermination()

}}

存放在hbase中**有點麻煩,接下來我的部落格中會像大家介紹兩種比較簡單的。

summed up by jiamingcan

Kafka直連方式儲存MySQL

記得在之前寫了一篇是mysql基礎使用的,這次就用mysql來儲存direct方式的偏移量。如下 package kafka1 import kafka.common.topicandpartition import kafka.message.messageandmetadata import k...

kafka直連方式api與Redis

kafka作為生產者,把生產的資料儲存到redis中,讀取的是json檔案,需要匯入阿里的包 一 pom檔案進行設定 redis.clients jedis 2.9.0 com.typesafe config 1.3.1 org.scalikejdbc scalikejdbc 2.11 2.5.0 ...

kafka直連方式消費多個topic

乙個消費者組可以消費多個topic,以前寫過一篇乙個消費者消費乙個topic的,這次的是乙個消費者組通過直連方式消費多個topic,做了小測試,結果是正確的,通過檢視zookeeper的客戶端,zookeeper記錄了偏移量 package day04 消費多個topic import kafka....