Sparkr入門 二 Spark架構

2021-10-09 13:57:06 字數 4160 閱讀 1958

一、概述

傳統的hadoop mapreduce計算模型適用於計算非迭代任務,因為mr得中間結果是存放在本地磁碟上的,這樣做是為了防止reduce任務執行失敗。所以,當使用迭代任務時,就需要多次訪問磁碟,造成效率低下。如果把這部分需要重複使用的資料放在記憶體中,則可以省去很多次訪問磁碟的時間。

spark則是使用rdd作為資料抽象,可以通過cache()可以將當前rdd所代表的資料儲存在記憶體中,使得我們可以在記憶體中直接重用之前的資料。所以,spark很適合用於迭代計算,這也是spark任務處理速度比mr快的原因之一。

二、程式設計模型

1.rdd

rdd是一組物件的唯讀集合,所有的轉換操作都是基於乙個rdd重新構建另乙個rdd。rdd不需要儲存在外存上,因為rdd得控制代碼包含了足夠的資訊,可以通過這些資訊從某個rdd或者原始資料進行計算重構rdd。所以不需要向hadoop那樣需要把所有的中間結果存在外存上,訪問效率就高了很多。

rdd有四種構建方式:

2、共享變數

我們客戶端與spark通訊是通過乙個sparkcontext型別的變數,spark中的各個rdd轉換操作採用了函式式程式設計,需要我們向其中傳遞函式和資料,集群中的每個任務都會得到這些資料乙個副本,更新這些副本資料不會影響我們驅動器程式中的變數,所以spark提供了累加器廣播變數兩個共享變數。

累加器用於將各個工作節點中的值聚合到驅動器程式中。

廣播變數使得我們可以高效的向所有的工作節點傳送乙個較大的唯讀值,以供乙個或多個spark操作使用,避免每個操作都需要向所有節點傳送一次這個資料。

三、實現

spark構建在mesos之上,mesos是乙個"集群作業系統",它允許多個並行應用程式以細粒度的方式共享乙個集群。

所有的rdd都實現了同乙個介面,這個介面包含三個方法:

每當對rdd進行並行操作時,spark都會建立乙個任務來處理該資料集所劃分的每個分割槽,將任務傳送到包含這些分割槽的節點上(計算向資料移動)。首先得到所有的分割槽列表,然後根據每個分割槽計算出分割槽所在的最近節點,然後把任務傳送到對應節點,使用getiterator,將該節點所含的分割槽id傳入,遍歷該分割槽的資料,進行操作。

廣播變數的實現,是將廣播變數存在檔案系統中,同時將檔案的路徑序列化傳到工作節點,當工作節點使用廣播變數時,先檢查其是否在快取中,如果不在則反序列檔案路徑,然後去對應的路徑上取資料。

累加器,則是在建立時被賦予乙個唯一的id,當工作節點執行任務後,工作節點會向驅動程式傳送一條訊息,其中包含自己對累加器所做的全部更新,驅動程式對每個更新操作在每個分割槽應用一次(防止由於失敗而重新執行任務出現重複計數)。

spark整合到scala,scala直譯器通常將使用者輸入的每一行都編譯為乙個類來操作,此類是乙個單例物件,包含該行上的資料或函式,並在其構造器中執行該行**,例如:

var x =5;

println

(x),

//直譯器會定義乙個包含x的類(例如line1),並為x賦值5,

//然後在第二行構造類line2的構造器執行println(line1.getinstance().x)

同時,spark稍微更改了一下這種解釋方式,第一是將每行生成的單例物件儲存到了乙個公共的檔案系統上,同時生成的單例物件直接引用前幾行生成的單例物件,而不是通過前幾行生成的單例物件呼叫getinstance方法來獲得引用。

四、spark架構

spark生態包括spark core、spark sql、spark streaming、mlib、graphx及部分。

4.1 生態架構圖

4.2 spark core的架構組成

4.3spark任務執行流程

具體執行流程:

(1)當乙個spark應用被提交後,即執行main函式,啟動main函式的這個節點就成為了該應用的driver節點,然後driver建立乙個sparkcontext(在driver節點上)。

(2)sparkcontext向cluster manager註冊並提交任務,然後申請執行executor的資源。

(3)資源管理器(cluster manager)會與所有worker通訊,找出空閒的worker,為executor分配節點和資源,啟動executor程序,executor向資源管理器傳送心跳。

(4)executor向sparkcontext申請task

(5)sparkcontext根據rdd的依賴關係構建dag圖,然後將dag提交給dag排程器(dagscheduler)進行解析,將dag分為成多個「階段」(每個階段乙個任務集),並且計算出各個階段之間的依賴關係,然後將任務集(taskset,即stage)提交給下層的taskschedule(任務排程器)進行處理並派發任務。

(6)task scheduler將task傳送給executor執行,同時sparkcontext將應用程式**傳送給executor

(7)executor執行完畢,將結果返回給任務排程器,然後由任務排程器返回給dag排程器,最後返回給使用者,然後釋放資源。

詳細流程圖:

4.4 stage的劃分

窄依賴:父rdd的每乙個分割槽最多被乙個子rdd的分割槽使用,可以理解為獨生子女。可以是乙個父rdd轉換為乙個子rdd的分割槽,或者多個父rdd的分割槽對應乙個子rdd的分割槽,例如spark中的map運算元。

寬依賴:除窄依賴以外的所有依賴,例如子rdd的分割槽依賴父rdd的所有分割槽,乙個父rdd對應多個子rdd,即需要shuffle的情況,例如groupbykey等操作。

stage:stage劃分是按照shuffle操作來劃分的,即按寬依賴劃分stage,shuffle之前的所有操作算作乙個stage,shuffle之後的所有操作算另乙個stage。

spark的dag排程器會將rdd的依賴關係轉換為dag圖,對於窄依賴,由於分割槽是一對一的(對於一對父子),所以分割槽轉換可以在乙個執行緒內完成,可以劃分到乙個stage中。寬依賴,需要spark在所有分割槽上進行shuffle,所以會打亂所有分割槽上原有資料分布,需要在每個worker上進行,只有等shuffle完成,下乙個操作才能繼續,所以stage按照寬依賴劃分。

spark會根據dag圖,從後向前推進,遇到乙個寬依賴就劃分出乙個stage,遇到窄依賴就將其加入當前stage。task的數量由每個stage中最後乙個操作所轉換出來的rdd的分割槽數量決定的(每個分割槽跑乙個task)

4.5 優點

總體而言,spark執行架構具有以下特點:

(1)每個應用都有自己專屬的executor程序,並且該程序在應用執行期間一直駐留。executor程序以多執行緒的方式執行任務,減少了多程序任務頻繁的啟動開銷,使得任務執行變得非常高效和可靠;

(2)spark執行過程與資源管理器無關,只要能夠獲取executor程序並保持通訊即可;

(3)executor上有乙個blockmanager儲存模組,類似於鍵值儲存系統(把記憶體和磁碟共同作為儲存裝置),在處理迭代計算任務時,不需要把中間結果寫入到hdfs等檔案系統,而是直接放在這個儲存系統上,後續有需要時就可以直接讀取;在互動式查詢場景下,也可以把錶提前快取到這個儲存系統上,提高讀寫io效能;

(4)任務採用了資料本地性和推測執行等優化機制。資料本地性是盡量將計算移到資料所在的節點上進行,即「計算向資料靠攏」,因為移動計算比移動資料所佔的網路資源要少得多。而且,spark採用了延時排程機制,可以在更大的程度上實現執行過程優化。比如,擁有資料的節點當前正被其他的任務占用,那麼,在這種情況下是否需要將資料移動到其他的空閒節點呢?答案是不一定。因為,如果經過**發現當前節點結束當前任務的時間要比移動資料的時間還要少,那麼,排程就會等待,直到當前節點可用。

Spark 面板入門

spark 是 flex 4 中的乙個新特性。spark.skins 包包含在 spark 命名空間中,並且已在adobe 的 flex 4 livedocs 中說明 自定義 spark 為 mxml 檔案,定義組成 spark 元件 的邏輯 圖形元素和其他物件 那麼這對於您意味著什麼呢?我希望這篇...

Spark入門系列

讀完spark官方文件後,在研究別人的原始碼以及spark的原始碼之前進行一番入門學習,這個系列不錯。spark系列 除此之外,databricks也是乙個非常不錯的 上面可以使用免費的spark集群進行 提交與測試,在youtube以及spark大會中都有其發布教程以及spark應用部署的相關細節...

Spark 入門詳解

redis資料持久化什麼作用?將記憶體中的資料寫入到硬碟中,進行永久儲存 防止資料丟失!rdd資料持久化什麼作用?1 對多次使用的rdd進行快取,快取到記憶體,當後續頻繁使用時直接在記憶體中讀取快取的資料,不需要重新計算。2 將rdd結果寫入硬碟 容錯機制 當rdd丟失資料時,或依賴的rdd丟失資料...