SparkStreaming 整合kafka例項

2021-09-28 21:41:35 字數 2805 閱讀 8625

核心概念

下面介紹kafka相關概念,以便執行下面例項的同時,更好地理解kafka.

接下來在ubuntu系統環境下測試簡單的例項。按順序執行如下命令:

進入kafka所在的目錄

命令執行後不會返回shell命令輸入狀態,zookeeper就會按照預設的配置檔案啟動服務,請千萬不要關閉當前終端.啟動新的終端,輸入如下命令:

cd /usr/local/kafka

bin/kafka-server-start.sh config/server.properties

kafka服務端就啟動了,請千萬不要關閉當前終端。啟動另外乙個終端,輸入如下命令:

cd /usr/local/kafka

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zhaogw

topic是發布訊息發布的category,以單節點的配置建立了乙個叫zhaogw的topic.可以用list列出所有建立的topics,來檢視剛才建立的主題是否存在。

可以在結果中檢視到zhaogw這個topic存在。接下來用producer生產點資料:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic zhaogw
hello world

hello hadoop

然後再次開啟新的終端或者直接按ctrl+c退出。然後使用consumer來接收資料,輸入如下命令:

便可以看到剛才產生的三條資訊。說明kafka安裝成功。

package com.zgw.spark.streaming

import org.apache.spark.sparkconf

import org.apache.spark.streaming.dstream.

import org.apache.spark.streaming.kafka.kafkautils

import org.apache.spark.streaming.

/** * created by zhaogw&lss on 2019/10/22.

*/object sparkstreaming_kafka

}

核心**

//kafka stream

val kafkadsream: receiverinputdstream[

(string, string)

]= kafkautils.

createstream

( streamcontext,

"dblab-virtualbox:2181"

,"zhaogw"

,map

("zhaogw"

->3)

)

檢視該方法的原始碼

def createstream

( ssc: streamingcontext,

zkquorum: string,

groupid: string,

topics: map[string, int]

, storagelevel: storagelevel = storagelevel.memory_and_disk_ser_2

)

這裡使用到了kafka的工具類kafkautils,第乙個引數是streamingcontext物件,第二個引數是zk所在的主機名(與hosts檔案中的配置對應),第三個引數是groupid,第四個引數是topics。

啟動專案,並在linux主機傳送兩條訊息

然後再idea中就可以看見消費的資料了

Spark Streaming入門詳解

背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...

Spark Streaming 程式監控

官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...

spark streaming讀取kafka示例

spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...