傳統的MapReduce框架慢在那裡

2022-07-11 21:18:13 字數 4060 閱讀 1727

常理上有幾個理由使得mapreduce框架慢於mpp資料庫:

容錯所引入的昂貴資料實體化(data materialization)開銷。

孱弱的資料布局(data layout),比如缺少索引。

執行策略的開銷[1 2]。

而我們對於hive的實驗也進一步證明了上述的理由,但是通過對hive「工程上」的改進,如改變儲存引擎(記憶體儲存引擎)、改善執行架構(partial dag execution)能夠縮小此種差距。同時我們也發現一些mapreduce實現的細節會對效能有巨大的影響,如任務排程的開銷,如果減小排程開銷將極大地提高負載的均衡性。

中間結果輸出:類似於hive這樣的基於mapreduce的查詢引擎,往往會將中間結果實體化(materialize)到磁碟上:

對於第一種情況,map的輸出結果儲存在磁碟上是為了確保能夠有足夠的空間來儲存這些大資料批量任務的輸出。而map的輸出並不會複製到不同的節點上去,因此如果執行map任務的節點失效的話仍會造成資料丟失[3]。由此可以推出,如果將這部分輸出資料快取在記憶體中,而不是全部輸出到磁碟上面也是合理的。shark shuffle的實現正是應用了此推論,將map的輸出結果儲存在記憶體中,極大地提高shuffle的吞吐量。通常對於聚合(aggregation)和過濾之類的查詢,它們的輸出結果往往遠小於輸入,這種設計是非常合理的。而ssd的流行,也會極大地提高隨機讀取的效能,對於大資料量的shuffle,能夠獲得較大的吞吐量,同時也擁有比記憶體更大的空間。

對於第二種情況,一些執行引擎擴充套件了mapreduce的執行模型,將mapreduce的執行模型泛化成更為通用的執行計畫圖(task dag),可以將多stage的任務串聯執行而無需將stage中間結果輸出到hdfs中去,這些引擎包括dryad[4], tenzing[5]和spark[6]。

資料格式和布局(layout)由於mapreduce單純的schema-on-read的處理方式會引起較大的處理開銷,許多系統在mapreduce模型內部設計和使用了更高效的儲存結構來加速查詢。hive本身支援「分割槽表(table partitions)」(一種基本的類索引系統,它將特定的鍵段儲存在特定的檔案中,可以避免對於整個表的掃瞄),類似於磁碟資料的列式儲存結構[7]。在shark中我們更進一步地採用了基於記憶體的列式儲存結構,shark在實現此結構時並沒有修改spark的**,而是簡單地將一組列式元組儲存為spark內的一條記錄,而對於列式元組內的結構則有shark負責解析。

另乙個spark獨有的特性是能夠控制資料在不同節點上的分割槽,這為shark帶來了一種新的功能:對錶進行聯合分割槽(co-partition)

最後,對於rdd我們還未挖掘其隨機讀取的能力,雖然對於寫入操作,rdd只能支援粗粒度的操作,但對於讀取操作,rdd可以精確到每一條記錄[6],這使得rdd可以用來作為索引, tenzing 可以用此來作為join操作的遠端查詢表(remote-lookup)

執行策略:hive在資料shuffle之前花費了大量的時間用來排序,同時將mapreduce結果輸出到hdfs上面也占用了大量的時間,這些都是由於hadoop自身基本的,單次迭代的mapreduce模型所限制的。對於spark這樣的更通用的執行引擎,則可減輕上述問題帶來的開銷。舉例來說,spark支援基於hash的分布式聚合和更為通用任務執行計畫圖(dag)

事實上,為了能夠真正優化關係型查詢的執行,我們發現在基於資料統計的基礎上來選擇執行計畫是非常有必要的。但是由於udf和複雜分析函式的存在,而shark又將其視為一等公民(first-class citizens),這種統計將變得十分困難。為了能夠解決這個問題,我們提出了partial dag execution (pde),這使得spark能夠在基於資料統計的基礎上改變後續執行計畫圖,pde與其他系統(dryadlinq)的執行時執行計畫圖重寫的不同在於:它能夠收集鍵值範圍內的細粒度統計資料;能夠完全重新選擇join的執行策略,如broadcast join,而不僅僅是選擇reduce任務的個數。

任務排程的開銷:大概在諸多影響shark的部分中,最令人感到意外的卻只是乙個純粹工程上的問題:執行任務帶來的開銷。傳統的mapreduce系統,就比如hadoop,是為了首席執行官達數小時的批量作業而設計的,而組成作業的每個任務其執行時間則有數分鐘之久,他們會在獨立的系統程序中執行任務,在某些極端情況下提交乙個任務的延遲非常之高。拿hadoop打比方,它使用週期性的「心跳」訊息來向工作節點分配任務,而這個週期是3秒鐘,因此總共的任務啟動延時就會高達5-10秒。這對於批處理的系統顯然是可以忍受的,但是對於實時查詢這顯然是不夠的。

為了避免上述問題,spark採用了事件驅動的rpc類庫來啟動任務,通過復用工作程序來避免系統程序開銷。它能夠在一秒鐘內啟動上千個任務,任務之間的延時小於5毫秒,從而使得50-100毫秒的任務,500毫秒的作業變得可能。而這種改進對於查詢效能的提公升,甚至對於較長執行時間的查詢效能的提公升也令我們感到吃驚不已。

亞秒級的任務使得引擎能夠更好地在工作節點之間平衡任務的分配,甚至在某些節點遇到了不可預知的延遲(網路延遲或是jvm垃圾**)的情況下面也能較好地平衡。同時對於資料傾斜也有巨大的幫助,考慮到在100個核上做雜湊聚合(hash aggregation),對於每乙個任務所處理的鍵範圍需要精心選定,任何的資料傾斜的部分都會拖慢整個作業。但是如果將作業分發到1000個核上面,那麼最慢的任務只會比平均任務慢10倍,這就大大提高了可接受程度。而當我們在pde中應用傾斜感知的選擇策略後,令我們感到失望的是相比於增大reduce任務個數帶來的提公升,這種策略所帶來的提公升卻比較小。但不可否認的是,引擎對於異常資料傾斜有了更高的穩定性。

在hadoop/hive中,錯誤的選擇任務數量往往會比優化好的執行策略慢上10倍,因此有大量的工作集中在如何自動的選擇reduce任務的數量[8 9],下圖可以看到hadoop/hive和spark reduce任務數量對於作業執行時間的影響。因為spark作業能夠以較小的開銷執行數千個reduce任務,資料傾斜的影響可以通過執行較多工來減小。

事實上,對於更大規模集群(數萬個節點)上亞秒級任務的可行性我們還未**。但是對於dremel[10]這樣的周期性地在數千個節點上執行亞秒級作業的系統,實際情況下當單個主節點無法滿足任務排程的速度時,排程策略可以將任務委派給子集群的「副」主節點。同時細粒度的任務執行策略相比於粗粒度的設計不僅僅帶來了負載均衡的好處,而且還包括快速恢復(fast recovery)(通過將失敗任務分發到更多的節點上去)、查詢的彈性(query elasticity)

雖然這篇文章主要關注的是細粒度任務模型帶來的容錯性優勢,這個模型同樣也提供了許多誘人的特性,接下將會介紹在mapreduce系統中已被證明的兩個特性。

伸縮性(elasticity):在傳統的mpp資料庫中,一旦分布式執行計畫被選中,系統就必須以此並行度執行整乙個的查詢。但是在細粒度任務系統中,在執行查詢的過程中節點可以增刪節點,系統會自動地把阻塞的作業分發到其他節點上去,這使得整個系統變得非常具有伸縮性。如果資料庫管理者需要在這個系統中移除某些節點,系統可以簡單地將這些節點視為失效節點,或者更好的處理方法是將這些節點上的資料複製到其他節點上去。與刪除節點相對應的是,當執行查詢變得更慢時,資料庫系統可以動態地申請更多的資源來提公升計算能力。亞馬遜的elastic mapreduce[11]已經支援執行時調整集群規模。

多租戶架構(multitenancy):多租戶架構如同上面提到伸縮性一樣,目的是為了在不同使用者之間動態地共享資源。在傳統的mpp資料庫中,當乙個重要的查詢提交的時候已經有乙個較大的查詢佔據了大多數的集群資源,這時能做的選擇不外乎就是取消先前的查詢等有限的操作。而在基於細粒度任務模型的系統中,查詢作業可以等待幾秒到當前作業完成,然後提交新的查詢作業。facebook和microsoft已經為hadoop和dryad開發了公平排程器,使得大型的、計算密集型的歷史記錄查詢與實時的小型查詢可以共享集群資源而不會產生飢餓現象[12 13]。

MapReduce框架原理

mapreduce工作流程 reduce端 2 流程詳解 上面的流程是整個mapreduce最全工作流程,但是shuffle過程只是從第7步開始到第15步結束,具體shuffle過程詳解,如下 1 maptask 收集我們的 map 方法輸出的 kv對,放到記憶體緩衝區中 2 從記憶體緩衝區不斷溢位...

MapReduce 框架原理

1.1 自定義bean物件實現序列化介面 乙個job在map階段並行度由客戶端在提交job時的切片數決定 每乙個split切片分配乙個 maptask 並行例項處理 預設情況,切片大小 blocksize 示例 a.txt line1 rich learning form line2 intelli...

Mapreduce框架的相關問題

jobtracker的單點故障 jobtracker和hdfs的namenode一樣也存在單點故障,單點故障一直是hadoop被人詬病的大問題,為什麼hadoop的做的檔案系統和mapreduce計算框架都是高容錯的,但是最重要的管理節點的故障機制卻如此不好,我認為主要是namenode和jobtr...