Spark Streaming反壓機制

2021-09-24 02:02:52 字數 4734 閱讀 5886

反壓(back pressure)機制主要用來解決流處理系統中,處理速度比攝入速度慢的情況。是控制流處理中批次流量過載的有效手段。

spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。

當批處理時間(batch processing time)大於批次間隔(batch interval,即 batchduration)時,說明處理資料的速度小於資料攝入的速度,持續時間過長或源頭資料暴增,容易造成資料在記憶體中堆積,最終導致executor oom或任務奔潰。

在這種情況下,若是基於receiver的資料來源,可以通過設定spark.streaming.receiver.maxrate來控制最大輸入速率;若是基於direct的資料來源(如kafka direct stream),則可以通過設定spark.streaming.kafka.maxrateperpartition來控制最大輸入速率。當然,在事先經過壓測,且流量高峰不會超過預期的情況下,設定這些引數一般沒什麼問題。但最大值,不代表是最優值,最好還能根據每個批次處理情況來動態預估下個批次最優速率。在spark 1.5.0以上,就可通過背壓機制來實現。開啟反壓機制,即設定spark.streaming.backpressure.enabledtrue,spark streaming會自動根據處理能力來調整輸入速率,從而在流量高峰時仍能保證最大的吞吐和效能。

spark streaming的反壓機制主要是通過ratecontroller元件來實現。ratecontroller繼承自介面streaminglistener,並實現了onbatchcompleted方法。每乙個batch處理完成後都會呼叫此方法,具體如下:

override def onbatchcompleted

(batchcompleted: streaminglistenerbatchcompleted)

computeandpublish

(processingend, elems, workdelay, waitdelay)

}

可以看到,接著又呼叫的是computeandpublish方法,如下:

private def computeandpublish

(time: long, elems: long, workdelay: long, waitdelay: long)

: unit =

future[unit]

}

更深一層,具體呼叫的是rateestimator.compute方法來預估新速率,如下:

def compute

( time: long,

elements: long,

processingdelay: long,

schedulingdelay: long)

: option[double]

該方法是介面rateestimator中的方法,會計算出新的批次每秒應攝入的記錄數。pidrateestimator,即pid速率估算器,是rateestimator的唯一實現,具體估算邏輯可看pidrateestimator.compute方法,邏輯很複雜,用到了微積分相關的知識,總之,一句話,即根據當前batch的結果和期望的差值來估算新的輸入速率。

spark.streaming.backpressure.enabled

預設值false,是否啟用反壓機制。

spark.streaming.backpressure.initialrate

預設值無,初始最大接收速率。只適用於receiver stream,不適用於direct stream

spark.streaming.backpressure.rateestimator

預設值pid,速率控制器,spark 預設只支援此控制器,可自定義。

spark.streaming.backpressure.pid.proportional

預設值1.0,只能為非負值。當前速率與最後一批速率之間的差值對總控制訊號貢獻的權重。用預設值即可。

spark.streaming.backpressure.pid.integral

預設值0.2,只能為非負值。比例誤差累積對總控制訊號貢獻的權重。用預設值即可。

spark.streaming.backpressure.pid.derived

預設值0.0,只能為非負值。比例誤差變化對總控制訊號貢獻的權重。用預設值即可。

spark.streaming.backpressure.pid.minrate

預設值100,只能為正數,最小速率。

//啟用反壓

conf.

set(

"spark.streaming.backpressure.enabled"

,"true"

)//最小攝入條數控制

conf.

set(

"spark.streaming.backpressure.pid.minrate"

,"1"

)//最大攝入條數控制

conf.

set(

"spark.streaming.kafka.maxrateperpartition"

,"12"

)

注意:

a. 反壓機制真正起作用時需要至少處理乙個批

由於反壓機制需要根據當前批的速率,預估新批的速率,所以反壓機制真正起作用前,應至少保證處理乙個批。

b. 如何保證反壓機制真正起作用前應用不會崩潰

要保證反壓機制真正起作用前應用不會崩潰,需要控制每個批次最大攝入速率。若為direct stream,如kafka direct stream,則可以通過spark.streaming.kafka.maxrateperpartition引數來控制。此引數代表了 每秒每個分割槽最大攝入的資料條數。假設batchduration為10秒,spark.streaming.kafka.maxrateperpartition為12條,kafka topic 分割槽數為3個,則乙個批(batch)最大讀取的資料條數為360條(3*12*10=360)。同時,需要注意,該引數也代表了整個應用生命週期中的最大速率,即使是背壓調整的最大值也不會超過該引數。

建立速率控制器

info pidrateestimator: created pidrateestimator with proportional =

1.0, integral =

0.2, derivative =

0.0, min rate =

1.0

計算當前批次速率
// records 記錄數(對應webui: input size)

// processing time 處理時間,毫秒(對應webui: processing time)

// scheduling delay 排程時間,毫秒(對應webui: scheduling delay)

trace pidrateestimator:

time =

1558888897224

, # records =

33, processing time =

24548

, scheduling delay =

8

預估新批次速率
trace pidrateestimator: 

latestrate =

-1.0

, error =

-2.344305035033404

latesterror =

-1.0

, historicalerror =

0.0010754440280267231

delaysinceupdate =

1.558888897225e9

, derror =

-8.623482003280801e-10

第一次計算跳過速率估計
trace pidrateestimator: first run, rate estimation skipped
當前批次沒有記錄或沒有延遲則跳過速率估計
trace pidrateestimator: rate estimation skipped
以新的預估速率執行
trace pidrateestimator: new rate

=1.0

可以看到,開啟反壓後,攝入速率input rate可以根據處理時間processing time來調整,從而保證應用的穩定性。

Spark Streaming反壓機制探秘

spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。當批處理時間 batch processing time 大於批次間隔 batch interval,即 batchduration 時,說明處理資料的速度小於資料攝入的速度,持續時間過...

Spark Streaming反壓機制原理及使用

spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。當批處理時間 batch processing time 大於批次間隔 batch interval,即 batchduration 時,說明處理資料的速度小於資料攝入的速度,持續時間過...

Spark Streaming入門詳解

背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...