Spark RDD 建立和分割槽規則

2021-10-10 22:50:59 字數 3561 閱讀 8831

1.1 從集合中建立rdd

從集合中建立rdd,主要提供了兩種函式:parallelize和makerdd

package com.xcu.bigdata.spark.core.pg02_rdd.pg021_rdd_create

import org.apache.spark.rdd.rdd

import org.apache.spark.

/** * @package : com.xcu.bigdata.spark.core.pg02_rdd.pg021_rdd_create

* @author :

* @date : 2020 11月 星期二

* @desc : 從集合中建立rdd

*/object spark01_createrdd_memory

}

1.2 從外部儲存建立rdd

由外部儲存系統的資料集建立rdd包括:本地的檔案系統,還有所有hadoop支援的資料集,比如hdfs、hbase等

package com.xcu.bigdata.spark.core.pg02_rdd.pg021_rdd_create

import org.apache.spark.rdd.rdd

import org.apache.spark.

/** * @package : com.xcu.bigdata.spark.core.pg02_rdd.pg021_rdd_create

* @author :

* @date : 2020 11月 星期二

* @desc : 從外部儲存建立rdd

*/object spark02_createrdd_file

}

2.1 記憶體分割槽規則和資料讀取規則

建立rdd的方式不一樣,分割槽規則不一樣

2.1 .1 切片規則

1) 預設的分割槽數取決於分配給應用的cpu的核數

2) 如果指定分割槽,那麼最終分割槽數就為指定的數目

2.1 .2 資料讀取規則

def positions(length:

long

, numslices:

int)

: iterator[

(int

,int)]

=}

資料讀取舉例:sc.makerdd(list(1, 2, 3, 4, 5), numslices = 3)

根據以上規則

分割槽 ---> [start end) ---> data

0 ---> [0, 1) ---> 1

1 ---> [1, 3) ---> 2 3

2 ---> [3, 5) ---> 4 5

package com.xcu.bigdata.spark.core.pg02_rdd.pg021_rdd_create

import org.apache.spark.rdd.rdd

import org.apache.spark.

/** * @package : com.xcu.bigdata.spark.core.pg02_rdd.pg021_rdd_create

* @author :

* @date : 2020 11月 星期三

* @desc : 從集合中建立rdd,並指定分割槽

* -預設的分割槽規則取決於分配給應用的cpu的核數

* -如果指定分割槽,那麼最終分割槽數就為指定的數目

*/object spark04_partition_mem

}

2.2 檔案分割槽規則和資料讀取規則

從檔案中建立rdd,分割槽規則採用的是haoop的分割槽和資料讀取規則:

1)檔案切片規則:以位元組方式來切片`

讀取外部檔案建立rdd(預設分割槽數取決於:math.min(分配給應用的cpu核數,2))

2 ) 資料讀取規則:以行為單位來讀取`

如果指定分割槽,那麼最後分割槽數取決於總的位元組數是否能夠整除預計的最小分割槽數,並且剩餘的位元組數達到乙個比率,那麼這也就導致了實際的分割槽數量,可能大於預計的最小分割槽數

切片舉例:例如乙個檔案11個位元組,設定minpartitions=2

minpartitions=2:即預計的最小分割槽數為2

goalsize = totalsize / numslices

11 / 2 = 5 ... 1 (1 / 5 )與 0.1 做比較,如果(1 / 5 = 0.2)大0.1, 那麼在開乙個分割槽,如果(1 / 5 )小0.1,那麼合併到最後乙個分割槽,故實際分割槽個數為3個

資料讀取舉例:例如乙個txt檔案10個位元組,minpartitions=4

txt檔案如下

123

4

顯示空格和換行,並用符號@代替

# 資料以行的方式讀取,但是會考慮偏移量(資料的offset)的設定

1@@ => 012

2@@ => 345

3@@ => 678

4 => 9

根據以上的分割槽規則我們知道,分為5個區

10byte / 4 = 2byte ... 2byte =>5個區

每個分割槽的偏移量為

0 =>(0, 2)

1 =>(2, 4)

2 =>(4, 6)

3 =>(6, 8)

4 =>(8, 10)

實際讀取的資料為

0 =>(0, 2)=> 1

1 =>(2, 4)=> 2

2 =>(4, 6)=> 3

3 =>(6, 8)=> 此處因為上一次已經把3給讀走了,所以當3被讀走後連帶著偏移量7 8也沒有了

4 =>(8, 10)=> 4

package com.xcu.bigdata.spark.core.pg02_rdd.pg021_rdd_create

import org.apache.spark.rdd.rdd

import org.apache.spark.

/** * @package : com.xcu.bigdata.spark.core.pg02_rdd.pg021_rdd_create

* @author :

* @date : 2020 11月 星期三

* @desc : 從檔案中建立rdd,分割槽規則和資料讀取規則

*/object spark05_partition_file

}

SparkRDD的分割槽

rdd的分割槽,在運算元裡面未指定rdd的分割槽的時候,預設的分割槽數和核數相同,同理也會啟動相應的task個數 原始碼中的分割槽數預設是2 sc.textfile 其中分割槽數和讀取的小檔案數相同,都小於128m,基於spark2.2.0的,textfile預設是呼叫的是hadoop的textfi...

Spark RDD分割槽數和分割槽器

partition rdd有個partitions方法 final def partitions array partition 能夠返回乙個陣列,陣列元素是rdd的partition。partition是rdd的最小資料處理單元,可以看作是乙個資料塊,每個partition有個編號index。乙個...

舉例說明Spark RDD的分割槽 依賴

例子如下 scala val textfilerdd sc.textfile users zhuweibin downloads hive 04053f79f32b414a9cf5ab0d4a3c9daf.txt 15 08 03 07 00 08 info memorystore ensurefr...