舉例說明Spark RDD的分割槽 依賴

2021-07-04 08:23:48 字數 3758 閱讀 8388

例子如下:

scala> val textfilerdd = sc.textfile("/users/zhuweibin/downloads/hive_04053f79f32b414a9cf5ab0d4a3c9daf.txt")

15/08/03

07:00:08 info memorystore: ensurefreespace(57160) called with curmem=0, maxmem=278019440

15/08/03

07:00:08 info memorystore: block broadcast_0 stored as values in memory (estimated size 55.8 kb, free 265.1 mb)

15/08/03

07:00:08 info memorystore: ensurefreespace(17237) called with curmem=57160, maxmem=278019440

15/08/03

07:00:08 info memorystore: block broadcast_0_piece0 stored as bytes in memory (estimated size 16.8 kb, free 265.1 mb)

15/08/03

07:00:08 info blockmanagerinfo: added broadcast_0_piece0 in memory on localhost:51675 (size: 16.8 kb, free: 265.1 mb)

15/08/03

07:00:08 info sparkcontext: created broadcast 0 from textfile at :21

textfilerdd: org.apache

.spark

.rdd

scala> println( textfilerdd.partitions

.size )

15/08/03

07:00:09 info fileinputformat: total input paths to process : 1

2scala> textfilerdd.partitions

.foreach

index:

0 hascode:1681

index:

1 hascode:1682

scala> println("dependency size:" + textfilerdd.dependencies)

dependency size:list(org.apache

.spark

.onetoonedependency

@543669de)

scala> println( textfilerdd )

scala> textfilerdd.dependencies

.foreach

dependency type:class org.apache

.spark

.onetoonedependency

dependency rdd:/users/zhuweibin/downloads/hive_04053f79f32b414a9cf5ab0d4a3c9daf.txt hadooprdd[0] at textfile at :21

dependency partitions:[lorg.apache

.spark

.partition

;@c197f46

dependency partitions size:2

scala>

scala> val flatmaprdd = textfilerdd.flatmap(_.split(" "))

flatmaprdd: org.apache

.spark

.rdd

scala> println( flatmaprdd )

scala> flatmaprdd.dependencies

.foreach

dependency type:class org.apache

.spark

.onetoonedependency

dependency partitions:[lorg.apache

.spark

.partition

;@c197f46

dependency partitions size:2

scala>

scala> val maprdd = flatmaprdd.map(word => (word, 1))

maprdd: org.apache

.spark

.rdd

scala> println( maprdd )

scala> maprdd.dependencies

.foreach

dependency type:class org.apache

.spark

.onetoonedependency

dependency partitions:[lorg.apache

.spark

.partition

;@c197f46

dependency partitions size:2

scala>

scala>

scala> val counts = maprdd.reducebykey(_ + _)

counts: org.apache

.spark

.rdd

.rdd[(string, int)] = shuffledrdd[4] at reducebykey at :27

scala> println( counts )

shuffledrdd[4] at reducebykey at :27

scala> counts.dependencies

.foreach

dependency type:class org.apache

.spark

.shuffledependency

dependency partitions:[lorg.apache

.spark

.partition

;@c197f46

dependency partitions size:2

scala>

從輸出我們可以看出,對於任意乙個rdd x來說,其dependencies代表了其直接依賴的rdds(乙個或多個)。那dependencies又是怎麼能夠表明rdd之間的依賴關係呢?假設dependency為dependencies成員

那麼,如果某個rdd的partition計算失敗,要回朔到哪個rdd為止呢?上例中列印出的dependency.rdd如下:

[1]at

textfile

at:21

[2]at

flatmap

at:23

[3]at

mapat

:25shuffledrdd

[4]at

reducebykey

at:27

可以看出每個rdd都有乙個編號,在回朔的過程中,每向上回朔一次變回得到乙個或多個相對父rdd,這時系統會判斷該rdd是否存在(即被快取),如果存在則停止回朔,如果不存在則一直向上回朔到某個rdd存在或到最初rdd的資料來源為止。

sprintf舉例說明

最近需要把圖形的座標轉換為字元來檢驗圖形座標的正確與否,所以較多的用到了sprintf 函式。例如 int sign 100 char s1 10 sprintf s1,d sign pdc textout 0,0,s1 這裡就把sprintf 函式的用法總結一下。int sprintf char ...

python argparse舉例說明

目的 想從命令列通過命令來指定是否執行程式中的某個函式 模組 更新關於另乙個例子 這裡只舉例說明 原因是看了很多博文之後,還是沒有解決筆者上面說的那個問題,即如何從命令列決定是否執行某段函式問題 設定乙個引數,名為foreground,簡寫f 即在終端可以輸入 foreground也可以輸入 f。跟...

c 引用 舉例說明

簡介 引用就是某一變數 目標 的乙個別名,對引用的操作與對變數直接操作完全一樣。c 11中新增了 右值引用 我們這裡所說的引用通常指 左值引用 例 int a 10 int rea a rea指向a,是a的另乙個名字 int rerea 錯誤 引用必須被初始化輸出a 和 rea 都是10。需要注意定...