關於Spark中RDD的思考和總結

2021-09-02 10:09:47 字數 2853 閱讀 7591

(**基於spark-core 1.2.0)

本來這篇想結合自己的經驗討論shuffle,但是shuffle討論之前還是準備先討論一下關於rdd的問題。 網上介紹rdd的我看過的有:

0、 spark ***** 這個是設計時候的*****

1、 2、 乙個介紹rdd甚好的ppt,之後我會發在我的雲盤。 

rdd學名resilient distributed dataset,"resillient"表示它的容錯能力,「distributed」說明它是分布式的實現,"dataset"說明它是基於集合操作的, rdd更準確的說是乙個介面,主要由5個介面或者說特性組成:

compute             :在action呼叫,生成乙個job時,rdd如果沒有cache的時候呼叫這個函式

getpartitions       :array[partitions] 每個分割槽由 index標示

getdependencies    : rdd的父rdds, 通常rdd有parent rdd,除了hadooprdd以及類似rdd,這些是資料源頭,所有rdd的關係構成rdd的lineage,一般翻譯成血緣關係

getpreferredlocations 

partitioner

總體而言,spark的主要介面分為transformation以及action,action觸發job的執行。 job執行時

1、sparkcontext呼叫dagscheduler劃分stage,stage劃分的依據是shuffle,shuffle前後屬於不同的stage。 2、stage封裝成taskset,提交給taskscheduler。

3、taskscheduler呼叫backend,獲取可用的executor資訊, 對於每乙個taskset中的task分配乙個executor的core。

5、taskrunner反序列化taskdescription,執行task, task的子類包括resulttask,和shufflemaptask, resulttask 的主要工作是執行 func(context, rdd.iterator(partition, context)) func是我們寫的spark程式的action函式。shufflemaptask則是拉取shuffle儲存的資料,主要的邏輯是 fetchiterator-> iterator.read。

每個人看原始碼時碰到想到的問題會是不一樣的,我碰到的兩個具體問題分別是:

1、 每個rdd的compute方法是如何工作的。 

2、 每個stage有parent stages, 但是為何stage包含的是option[shuffledependency],而rdd包含乙個物件seq[dependency]

這兩個問題難度一般,主要是對於rdd和stage生成過程不清晰導致的。 rdd的生成過程是從前向後

val text = sc.textfile("some/path/hold/data")

val rdd 1 = text.flatmap(_.split(" "))

val rdd2 = rdd1.map(w=>(w,1))

val rdd3 = rdd2.reducebykey(_+_)

那麼存在 text->rdd1->rdd2->rdd3的lineage。而stage的產生是dagscheduler從後向前劃分產生的。從後向前的**依次為: 

var finalstage: stage = null

try

private def getparentstages(rdd: rdd[_], jobid: int): list[stage] = }}

}waitingforvisit.push(rdd)

while (!waitingforvisit.isempty)

parents.tolist

}

private def getshufflemapstage(shuffledep: shuffledependency[_, _, _], jobid: int): stage = 

}

private def neworusedstage(

rdd: rdd[_],

numtasks: int,

shuffledep: shuffledependency[_, _, _],

jobid: int,

callsite: callsite)

: stage =

else

}}

至此,dagscheduler生成stage問題梳理清楚了。 

第二個問題是compute的作用。這個比較簡單

(rdd)

final def iterator(split: partition, context: taskcontext): iterator[t] = else

}private[spark] def computeorreadcheckpoint(split: partition, context: taskcontext): iterator[t] =

override def compute(split: partition, context: taskcontext) =

firstparent[t].iterator(split, context).map(f)

}

又如shuffledrdd

override def compute(split: partition, context: taskcontext): iterator[(k, c)] =
這裡插一句 shuffledrdd是shuffle完成之後,向shufflemanager去取資料,而寫資料則是在shufflemaptask中。 這將在以後一節詳細介紹。

下節介紹spark的shuffle過程

對spark中RDD的理解

update at 2016.1.25 rdd作者的 鏈結 的理解 spark要解決的問題 1 有些資料要多次讀寫,磁碟速度很慢 2 沒有互動的介面,不能看到中間結果 適用的應用 1 機器學習 多個迭代次運算,逼近 優化問題 是不是三維重建中優化也可以用到這個 2 計算結果還要用的 pagerank...

spark中建立RDD的方式

spark中建立rdd的幾種方式 1 使用程式中的集合建立rdd 一般用於測試 2 使用本地檔案系統建立rdd 一般用於資料量大的檔案的測試 3 基於hdfs建立rdd 生產環境最常用的rdd建立方式 4 使用s3建立rdd 5 基於資料流建立rdd packagecom.dt.spark impo...

Spark開發 spark執行原理和RDD

核心 1 spark執行原理 2 rdd 1 spark執行原理 spark應用程式基本概念spark基本工作流程spark 應用程式程式設計模型 1 driver program sparkcontext 1 1匯入spark的類和隱式轉換 1 2構建spark應用程式的執行環境 sparkcon...