sparkStreaming基礎知識整理

2021-10-23 20:13:32 字數 4084 閱讀 3709

//local[n]  其中n要大於接受器的個數

val sparkconf =

newsparkconf()

.setmaster

("local[2]").

("networkwordcount"

)val ssc =

newstreamingcontext

(sparkconf,

seconds(1

))//建立乙個接收器

val lines = ssc.

sockettextstream

("localhost"

,9999

)//指定資料來源

val words = lines.

flatmap

(_.split

(" "))

val wordcounts = words.

map(x =

>

(x,1))

.reducebykey

(_ + _)

wordcounts.

print()

//開始

ssc.

start()

//等待終止訊號

ssc.

awaittermination

()

val sparkconf =

newsparkconf()

.("hdfswordcount").

setmaster

("local[2]"

)val ssc =

newstreamingcontext

(sparkconf,

seconds(2

))// 建立fileinputdstream去讀取檔案系統上的資料

val lines = ssc.

textfilestream

("hdfs://hadoop131:9000/data"

)//使用空格進行分割每行記錄的字串

val words = lines.

flatmap

(_.split

(" "))

//類似於rdd的程式設計,將每個單詞賦值為1,並進行合併計算

val wordcounts = words.

map(x =

>

(x,1))

.reducebykey

(_ + _)

wordcounts.

print()

ssc.

start()

ssc.

awaittermination

()

flume資料來源

1、push的方式讀取資料

val conf: sparkconf =

newsparkconf()

.("flumedemo").

setmaster

("local[3]"

) val ssc =

newstreamingcontext

(conf,

seconds(5

))//push 方式 由主機推送資料給sparkstreaming 需要先啟動sparkstreaming

val flumestream: receiverinputdstream[sparkflumeevent]

= flumeutils.

createstream

(ssc,

"hadoop131"

,5678

)//flume 作為sparking streaming 的實時資料流 每一條資料是乙個event 故此時形成的dstream中的資料是乙個乙個的event

//event 有body 和header

flumestream.

map(x=

>

newstring

(x.event.getbody

.array()

).trim)

.flatmap

(_.split

(" "))

.map

((_,1)

).reducebykey

(_+_)

.print()

ssc.

start()

ssc.

awaittermination

()

2、poll的方式獲取資料

val conf: sparkconf =

newsparkconf()

.("flumedemo").

setmaster

("local[3]"

) val ssc =

newstreamingcontext

(conf,

seconds(5

))//poll方式 主動拉取資料,需要先啟動flume

val flumestream=flumeutils.

createpollingstream

(ssc,

"hadoop131"

,5678

) flumestream.

map(x=

>

newstring

(x.event.getbody.

array()

).trim)

.flatmap

(_.split

(" "))

.map

((_,1)

).reducebykey

(_+_)

.print()

ssc.

start()

ssc.

awaittermination

()

kafka資料來源

//設定主函式的引數  第乙個是brokers  第二個是topics  可以使用逗號隔開 傳入多個topics

//sparkstreaming 可以一次性讀取 kafka中的多個topic中的資料

val array

(brokers, topics)

= args

val sparkconf =

newsparkconf()

.("directkafkawordcount").

setmaster

("local[1]"

) val ssc =

newstreamingcontext

(sparkconf,

seconds(2

))val topicsset = topics.

split

(","

).toset

val kafkaparams = map[string, string]

("bootstrap.servers"

-> brokers)

val messages = kafkautils.createdirectstream[string, string]

(ssc,

locationstrategies.preferconsistent,

consumerstrategies.subscribe[string,

string]

(topicsset,kafkaparams)

) messages.

map(_.

value()

)// 取出 value

.flatmap

(_.split

(" "))

// 將字串使用空格分隔

.map

(word =

>

(word,1)

)// 每個單詞對映成乙個 pair

.reducebykey

(_+_)

// 根據每個 key 進行累加

.print()

// 列印前 10 個資料

ssc.

start()

ssc.

awaittermination

()

Spark Streaming入門詳解

背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...

Spark Streaming 程式監控

官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...

spark streaming讀取kafka示例

spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...