mapreduce推測執行目前版本的判定準則

2021-10-13 22:59:20 字數 2842 閱讀 5626

目前版本的判定準則:

新版本仍然對每個任務至多排程2個執行例項(預設引數設定)。但目前的hadoop版本中的推測任務判定**重點關注新啟動的備份任務是否有潛力比當前正在執行的任務完成得更早。**如果通過一定的演算法推測某一時刻啟動備份任務,該備份任務肯定會比當前任務完成得晚,那麼啟動該備份任務只會浪費更多的資源

然而,從另乙個角度看,如果推測備份任務比當前任務完成得早,則啟動備份任務會加快資料處理,且備份任務完成得越早,啟動備份任務的價值越大。

mapreduce.map.speculative:是否啟動 map 階段的推測執行,預設為true。其實一般情況設定為 false 比較好。可通過方法 job.setmapspeculativeexecution來設定。

mapreduce.reduce.speculative:是否需要啟動 reduce 階段的推測執行,預設為true,其實一般情 況設定為fase比較好。可通過方法job.setreducespeculativeexecution來設定。

當前判定準則中,最關鍵的是判斷如下三個值。即:

1.當前任務預計結束時間,

2.若啟動新的任務例項,新的任務例項的預計結束時間,以及

3.二者的差值。

那麼顯然,這個差越大說明啟動備份任務的價值越大,因為能夠更早的執行結束。

// 當前任務的預計結束時間 = 任務預計執行時間 + 任務開始時間

long estimatedendtime = estimatedruntime + taskattemptstarttime;

// 備份執行例項預計結束時間 = 新任務預計執行時間 + 當前時間

long estimatedreplacementendtime = now + estimator.estimatednewattemptruntime(taskid);

// 備份任務例項啟動價值 = 當前任務預計結束時間 - 備份執行例項預計結束時間

long result = estimatedendtime - estimatedreplacementendtime;

上面的計算公式裡面,最重要的就是如何去計算estimatedruntime和如何去估計乙個新任務的預計執行時間。而在mapreduce中,這兩個值的計算是比較簡單的。estimateruntime就是按照當前進度和已經花費的時間去線性估計預計執行時間。而對新的任務的執行時間估計,為該作業已完成的同型別任務的平均執行時間。比如作業j的乙個新的map任務的執行時間估計為作業j的所有已經完成的map任務的平均執行時間。

estimateruntime = (currenttimestamp - taskstarttime)/progress

// 以該job的所有task的執行時間均值來估計乙個新任務的執行總時間

@override

public long estimatednewattemptruntime(taskid id)

return (long)statistics.mean();

}

對每乙個任務我們都得到了上述的result結果,即代表備份執行例項啟動價值。那麼我們每次都會選取價值最大的任務來啟動乙個新的備份例項。同時,我們需要滿足:

1.每個任務至多兩個備份例項

2.乙個作業的所有備份例項的總數不能超過所有正在執行任務總數的10%

3.在當前版本中,選取備份任務的**如下:

private int maybescheduleaspeculation(tasktype type) 

int numberspeculationsalready = 0;

int numberrunningtasks = 0;

// loop through the tasks of the kind

job job = context.getjob(jobentry.getkey());

maptasks = job.gettasks(type);

int numberallowedspeculativetasks

= (int) math.max(minimumallowedspeculativetasks,

proportiontotaltasksspeculatable * tasks.size());

taskid besttaskid = null;

long bestspeculationvalue = -1l;

// this loop is potentially pricey.

// todo track the tasks that are potentially worth looking at

for (map.entrytaskentry : tasks.entryset())

if (myspeculationvalue != not_running)

if (myspeculationvalue > bestspeculationvalue)

}numberallowedspeculativetasks

= (int) math.max(numberallowedspeculativetasks,

proportionrunningtasksspeculatable * numberrunningtasks);

// if we found a speculation target, fire it off

if (besttaskid != null

&& numberallowedspeculativetasks > numberspeculationsalready)

}return successes;

}

MapReduce的推測執行(Hive優化)

在分布式集群環境下,因為程式 bug 包括 hadoop 本身的 bug 負載不均衡或者資 源分布不均等原因,會造成同乙個作業的多個任務之間執行速度不一致,有些任務的執行速 度可能明顯慢於其他任務 比如乙個作業的某個任務進度只有 50 而其他所有任務已經 執行完畢 則這些任務會拖慢作業的整體執行進度...

20 mapreduce推測執行機制

1 推測執行機制實際上是hadoop提供的一種針對慢任務的優化方法 當出現慢任務的時候,hadoop會將這個慢任務複製乙份放到其他節點上,兩個節點同時執行相同的任務,誰先執行完,那麼結果就作為最後的結果,另乙個沒有執行完的任務就會被kill掉 2 慢任務出現的場景 任務分配不均勻 機器效能不均等 資...

Hive 推測執行

在分布式集群環境下,因為程式bug 包括hadoop本身的bug 負載不均衡或者資源分布不均等原因,會造成同乙個作業的多個任務之間執行速度不一致,有些任務的執行速度可能明顯慢於其他任務 比如乙個作業的某個任務進度只有50 而其他所有任務已經執行完畢 則這些任務會拖慢作業的整體執行進度。為了避免這種情...