Spark Streaming應用與實戰全攻略

2021-09-10 17:40:44 字數 4272 閱讀 4183

一、背景與架構改造

1.1 問題描述

有一塊業務主要是做爬蟲抓取與資料輸出,通過大資料這邊提供的soa服務入庫到hbase,架構大致如下:

架構改造之前

以對於以上的架構存在一些問題,我們可以看見資料在dubbox服務階段處理後直接通過hbase api入庫了hbase,中間並沒做任何緩衝,要是hbase出現了問題整個集群都完蛋,沒法寫入資料,資料還丟失,hbase這邊壓力也相當大,針對這一點,對入庫hbase這個階段做了一些改造。

1.2 架構改造

改造後的架構,爬蟲通過介面服務,入庫到kafka,spark streaming去消費kafka的資料,入庫到hbase.核心元件如下圖所示:

架構改造圖

為什麼不直接入庫到hbase,這樣做有什麼好處?

緩解了hbase這邊峰值的壓力,並且流量可控;

hbase集群出現問題或者掛掉,都不會照成資料丟失的問題;

增加了吞吐量。

1.3 為什麼選擇kafka和spark streaming

由於kafka它簡單的架構以及出色的吞吐量;

kafka與spark streaming也有專門的整合模組;

spark的容錯,以及現在技術相當的成熟。

二、通過**實現具體細節,並執行專案

然後就開始寫**了,總體思路就是:

put資料構造json資料,寫入kafka;

spark streaming任務啟動後首先去zookeeper中去讀取offset,組裝成fromoffsets;

spark streaming 獲取到fromoffsets後通過kafkautils.createdirectstream去消費kafka的資料;

讀取kafka資料返回乙個inputdstream的資訊,foreachrdd遍歷,同時記錄讀取到的offset到zk中;

寫入資料到hbase。

詳細一點的架構圖

2.1 初始化與配置載入

下面是一些接收引數,載入配置,獲取配置中的topic,還有初始化配置,**如下:

//接收引數

val array(kafka_topic, timewindow, maxrateperpartition) = args

//載入配置

val prop: properties = new properties()

prop.load(this.getclass().getresourceasstream("/kafka.properties"))

val groupname = prop.getproperty("group.id")

//獲取配置檔案中的topic

val kafkatopics: string = prop.getproperty("kafka.topic." + kafka_topic)

if (kafkatopics == null || kafkatopics.length <= 0) /$i"

if (zkclient.exists(path)) else $ $ $")

//topicandpartition 主構造引數第乙個是topic,第二個是kafka partition id

val topicandpartition = topicandpartition(offsetrange.topic, offsetrange.partition)

val either = kc.setconsumeroffsets(groupname, map((topicandpartition, offsetrange.untiloffset))) //是

if (either.isleft) ")

partitionrecords.foreach(data => catch catch $ $ $")

//topicandpartition 主構造引數第乙個是topic,第二個是kafka partition id

val topicandpartition = topicandpartition(offsetrange.topic, offsetrange.partition)

val either = kc.setconsumeroffsets(groupname, map((topicandpartition, offsetrange.untiloffset))) //是

if (either.isleft) ")

/** 解析partitionrecords資料 */

if (offsetrange.topic != null) catch error ", e)

false

4.5 執行

剛測試時給它相對很小的記憶體跑一跑:

[[email protected] ~]# /opt/cloudera/parcels/cdh/bin/spark-submit \

--master yarn-client --num-executors 1 \

--driver-memory 256m --conf spark.yarn.driver.memoryoverhead=256 \

--conf spark.yarn.am.memory=256m --conf spark.yarn.am.memoryoverhead=256  \

--executor-memory 256m --conf spark.yarn.executor.memoryoverhead=256  \

--executor-cores 1  \

--class com.creditease.streaming.kafkadatastream hspark-1.0.jar 1 3 30000

五六萬的插入沒什麼壓力,但是到10萬的時候,就有些卡頓了!!

yarn 容器、cpu、記憶體大小

五六萬的插入沒什麼壓力

當然是需要增大記憶體的,修改配置,都增加一倍:

yarn 容器、cpu、記憶體大小

90000的插入沒什麼壓力

檢視插入資料量,能看到修改後插入資料10萬是沒有什麼壓力的:

檢視插入資料量,能看到修改後插入資料10萬是沒有什麼壓力的

當我們再繼續加大壓力測試的時候,效能下降:

當我們再繼續加大壓力測試的時候,效能下降

檢視統計資訊:

檢視統計資訊

Spark Streaming入門詳解

背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...

Spark Streaming 程式監控

官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...

spark streaming讀取kafka示例

spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...