spark Streaming 學習筆記

2021-09-14 00:02:18 字數 1471 閱讀 8329

import org.apache.spark._

import org.apache.spark.streaming._

val ssc = new streamingcontext(sc, seconds(1)) // seconds(1):批處理間隔為一秒

val lines = ssc.sockettextstream("localhost",9999)

val words = lines.flatmap(_.split(" "))

val pair = words.map(word => (word,1))

val wordcounts = pair.reducebykey(_+_)

wordcounts.print()

ssc.start() //開始

ssc.awaittermination() //等待計算終止

也可以使用 spark 自帶的例子

./bin/run-example streaming.networkwordcount localhost 9999
spark streaming 程式流程:

定義上下文:

val ssc = new streamingcontext(sc, second(2))
通過建立輸入的 dstream 來定義輸入源

通過轉化和輸出操作來定義 dstream 的計算

開始接收輸入並執行計算 streamcontext.start()

等待處理完或出錯後停止 streamingcontext.awaittermination()

手動停止 streamingcontext.stop()

注意事項:①一旦啟動,就不能在新增新的流進去。②一旦停止,就無法重新啟動。③乙個 jvm 只能有乙個streamingcontext。④stop() 會也會停掉 sparkcontext。⑤只要在建立新的 streamingcontext 時停止前乙個 streamingcontext,就可以在乙個 sparkcontext 裡建立多個 streamingcontext。

dstream.foreachrdd

}

可以想下面一樣定義,但是會為每條記錄建立乙個連線, 產生高開銷

dstream.foreachrdd

}

rdd.foreachpartition- 建立單個連線物件並使用該連線傳送rdd分割槽中的所有記錄。

dstream.foreachrdd

connect.close()

}}

進一步優化,可以維護連線物件的連線池

dstream.foreachrdd

connectionpool.returnconnection(connect)

}}

Spark Streaming入門詳解

背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...

Spark Streaming 程式監控

官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...

spark streaming讀取kafka示例

spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...