Spark Streaming反壓機制原理及使用

2021-10-24 19:05:09 字數 3220 閱讀 4115

spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。

當批處理時間(batch processing time)大於批次間隔(batch interval,即 batchduration)時,說明處理資料的速度小於資料攝入的速度,持續時間過長或源頭資料暴增,容易造成資料在記憶體中堆積,最終導致executor oom或任務奔潰。

在這種情況下,若是基於kafka receiver的資料來源,可以通過設定spark.streaming.receiver.maxrate來控制最大輸入速率;若是基於direct的資料來源(如kafka direct stream),則可以通過設定spark.streaming.kafka.maxrateperpartition來控制最大輸入速率。當然,在事先經過壓測,且流量高峰不會超過預期的情況下,設定這些引數一般沒什麼問題。但最大值,不代表是最優值,最好還能根據每個批次處理情況來動態預估下個批次最優速率。在spark 1.5.0以上,就可通過背壓機制來實現。開啟反壓機制,即設定spark.streaming.backpressure.enabled為true,spark streaming會自動根據處理能力來調整輸入速率,從而在流量高峰時仍能保證最大的吞吐和效能。

override

def onbatchcompleted(batchcompleted: streaminglistenerbatchcompleted)

computeandpublish(processingend, elems, workdelay, waitdelay)

}

可以看到,接著又呼叫的是computeandpublish方法,如下:

private

def computeandpublish(time:

long

, elems:

long

, workdelay:

long

, waitdelay:

long):

unit

= future[

unit

]}

更深一層,具體呼叫的是rateestimator.compute方法來預估新速率,如下:

def compute(

time:

long

, elements:

long

, processingdelay:

long

, schedulingdelay:

long

): option[

double

]

spark.streaming.backpressure.enabled

預設值false,是否啟用反壓機制。

spark.streaming.backpressure.initialrate

預設值無,初始最大接收速率。只適用於receiver stream,不適用於direct stream。型別為整數,預設直接讀取所有,在1開啟的情況下,限制第一次批處理應該消費的資料,因為程式冷啟動佇列裡面有大量積壓,防止第一次全部讀取,造成系統阻塞

spark.streaming.kafka.maxrateperpartition

型別為整數,預設直接讀取所有,限制每秒每個消費執行緒讀取每個kafka分割槽最大的資料量

spark.streaming.stopgracefullyonshutdown

優雅關閉,確保在kill任務時,能夠處理完最後一批資料,再關閉程式,不會發生強制kill導致資料處理中斷,沒處理完的資料丟失

只有 1+3 啟用的時候,每次消費讀取的數量最大會等於3設定的值,最小是spark根據系統負載自動推斷的值,消費的資料量會在這兩個範圍之內變化根據系統情況,但第一次啟動會有多少讀多少資料。此後按 1+3 設定規則執行

1+2+3 同時啟用的時候,跟上乙個消費情況基本一樣,但第一次消費會得到限制,因為我們設定第一次消費的頻率了。

spark.streaming.backpressure.rateestimator

預設值pid,速率控制器,spark 預設只支援此控制器,可自定義。

spark.streaming.backpressure.pid.proportional

預設值1.0,只能為非負值。當前速率與最後一批速率之間的差值對總控制訊號貢獻的權重。用預設值即可。

spark.streaming.backpressure.pid.integral

預設值0.2,只能為非負值。比例誤差累積對總控制訊號貢獻的權重。用預設值即可。

spark.streaming.backpressure.pid.derived

預設值0.0,只能為非負值。比例誤差變化對總控制訊號貢獻的權重。用預設值即可。

spark.streaming.backpressure.pid.minrate

預設值100,只能為正數,最小速率。

//啟用反壓機制

conf.set(

"spark.streaming.backpressure.enabled"

,"true"

)//最小攝入條數控制

conf.set(

"spark.streaming.backpressure.pid.minrate"

,"1"

)//最大攝入條數控制

conf.set(

"spark.streaming.kafka.maxrateperpartition"

,"12"

)//初始最大接收速率控制

conf.set(

"spark.streaming.backpressure.initialrate"

,"10"

)

要保證反壓機制真正起作用前spark 應用程式不會崩潰,需要控制每個批次最大攝入速率。以direct stream為例,如kafka direct stream,則可以通過spark.streaming.kafka.maxrateperpartition引數來控制。此引數代表了 每秒每個分割槽最大攝入的資料條數。假設batchduration為10秒,spark.streaming.kafka.maxrateperpartition為12條,kafka topic 分割槽數為3個,則乙個批(batch)最大讀取的資料條數為360條(31210=360)。同時,需要注意,該引數也代表了整個應用生命週期中的最大速率,即使是背壓調整的最大值也不會超過該引數。

Spark Streaming反壓機制

反壓 back pressure 機制主要用來解決流處理系統中,處理速度比攝入速度慢的情況。是控制流處理中批次流量過載的有效手段。spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。當批處理時間 batch processing time...

Spark Streaming反壓機制探秘

spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。當批處理時間 batch processing time 大於批次間隔 batch interval,即 batchduration 時,說明處理資料的速度小於資料攝入的速度,持續時間過...

Spark Streaming入門詳解

背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...