Kafka直連方式儲存MySQL

2021-09-01 06:32:27 字數 1348 閱讀 8805

記得在之前寫了一篇是mysql基礎使用的,這次就用mysql來儲存direct方式的偏移量。

**如下:

package kafka1

import kafka.common.topicandpartition

import kafka.message.messageandmetadata

import kafka.serializer.stringdecoder

import org.apache.spark.sparkconf

import org.apache.spark.streaming.dstream.inputdstream

import org.apache.spark.streaming.kafka.kafkacluster.err

import org.apache.spark.streaming.kafka.

import org.apache.spark.streaming.

import scalikejdbc.

import scalikejdbc.config.dbs

/*將偏移量儲存到mysql中

*/class directmysql '")

.map(m=>(topicandpartition(

m.string("topic"),m.int("partitions")),m.long("untiloffsets")))

}.tomap //最後要tomap一下,因為前面的返回值已經給定

//建立乙個inputdstream,然後根據offset讀取資料

var kafkastream:inputdstream[(string,string)]=null

//從mysql中獲取資料,進行判斷

if(fromdboffset.size==0)elseelse})}

val messagehandler = (mmd:messageandmetadata[string,string])=>

kafkastream= kafkautils.createdirectstream[string,string,

stringdecoder,stringdecoder,

(string,string)](ssc,kafkas,checkoffsets,messagehandler)

}//開始處理資料流,和zk一樣

kafkastream.foreachrdd(kafkardd=>)})

ssc.start()

ssc.awaittermination()

}}

summed up by jiamingcan

Kafka直連儲存HBase

在之前介紹了kafka與sparkstreaming互動的兩種方式,我提到了公司採用的是direct方式,這次我向大家分享一下將偏移量儲存在hbase中。如下 package kafka1 import kafka.common.topicandpartition import kafka.mess...

kafka直連方式api與Redis

kafka作為生產者,把生產的資料儲存到redis中,讀取的是json檔案,需要匯入阿里的包 一 pom檔案進行設定 redis.clients jedis 2.9.0 com.typesafe config 1.3.1 org.scalikejdbc scalikejdbc 2.11 2.5.0 ...

kafka直連方式消費多個topic

乙個消費者組可以消費多個topic,以前寫過一篇乙個消費者消費乙個topic的,這次的是乙個消費者組通過直連方式消費多個topic,做了小測試,結果是正確的,通過檢視zookeeper的客戶端,zookeeper記錄了偏移量 package day04 消費多個topic import kafka....