影響Spark輸出RDD分割槽的操作函式

2021-07-28 22:15:50 字數 2195 閱讀 3760

cogroup,groupwith,join,leftouterjoin,rightouterjoin,groupbykey,reducebykey,combinebykey,partitionby,sort,mapvalues(如果父rdd存在partitioner),flatmapvalues(如果父rdd存在partitioner), 和filter(如果父rdd存在partitioner)。

其他的transform操作不會影響到輸出rdd的partitioner,一般來說是none,也就是沒有partitioner。

下面舉個例子進行說明: 

scala> val pairs = sc.parallelize(list((1, 1), (2, 2), (3, 3)))

pairs: org.apache.spark.rdd.rdd[(int, int)] =

parallelcollectionrdd[4] at parallelize at :12

scala> val a = sc.parallelize(list(2,51,2,7,3))

a: org.apache.spark.rdd.rdd[int] =

parallelcollectionrdd[5] at parallelize at :12

scala> val a = sc.parallelize(list(2,51,2))

a: org.apache.spark.rdd.rdd[int] =

parallelcollectionrdd[6] at parallelize at :12

scala> val b = sc.parallelize(list(3,1,4))

b: org.apache.spark.rdd.rdd[int] =

parallelcollectionrdd[7] at parallelize at :12

scala> val c = a.zip(b)

c: org.apache.spark.rdd.rdd[(int, int)] =

zippedpartitionsrdd2[8] at zip at :16

scala> val result = pairs.join(c)

result: org.apache.spark.rdd.rdd[(int, (int, int))] =

scala> result.partitioner

res6: option[org.apache.spark.partitioner] = some(org.apache.spark.hashpartitioner@2)

大家可以看到輸出來的rdd result分割槽變成了hashpartitioner,因為join中的兩個分割槽都沒有設定分割槽,所以預設用到了hashpartitioner,可以看join的實現:

def join[w](other: rdd[(k, w)]): rdd[(k, (v, w))] = 

def defaultpartitioner(rdd: rdd[_], others: rdd[_]*): partitioner =

if (rdd.context.conf.contains("spark.default.parallelism")) else

}

defaultpartitioner函式就確定了結果rdd的分割槽。從上面的實現可以看到, 

1、join的兩個rdd如果都沒有partitioner,那麼join結果rdd將使用hashpartitioner; 

2、如果兩個rdd中其中有乙個有partitioner,那麼join結果rdd將使用那個父rdd的partitioner; 

3、如果兩個rdd都有partitioner,那麼join結果rdd就使用呼叫join的那個rdd的partitioner。 

spark 的RDD分割槽

rdd的倆種建立方 1.從集合中建立rdd,spark主要提供了兩種函式 parallelize和makerdd 使用parallelize 從集合建立 scala val rdd sc.parallelize array 1,2,3,4,5,6,7,8 使用makerdd 從集合建立 scala ...

Spark中RDD分割槽以及節點

spark中rdd分割槽 對於二元rdd使用時,例如在使用join 時 我們對資料集是如何分割槽的卻一無所知。預設情況下,連線操作會將兩個資料集中的所有鍵的雜湊值都求出來,將該雜湊值相同的記錄通過網路傳到同一臺機器 上,然後在那台機器上對所有鍵相同的記錄進行連線操作,會非常消耗效能,如果乙個資料集設...

Spark中RDD的分割槽數時如何的?

看目錄可能方便val rdd sc.parallelize list,6 分割槽數 指定分割槽數 val sc new sparkcontext new sparkconf set spark.default.parallelism 10 setmaster local 4 test 預設分割槽數 ...