RDD轉換運算元 單值value

2022-09-11 04:51:14 字數 4651 閱讀 8805

sparks運算元總結:

lvalue型別

1)     map

def map[u: classtag](f: t => u): rdd[u]   //單值處理邏輯

將處理的資料逐條進行對映轉換,這裡的轉換可以是型別的轉換,也可以是值的轉換。

val datardd: rdd[int] = sparkcontext.makerdd(list(1,2,3,4))

val datardd1: rdd[int] = datardd.map(

num =>

)f: iterator[t] => iterator[u],   //迭代方式

preservespartitioning: boolean = false): rdd[u]   //是否保留父分割槽

將待處理的資料以分割槽為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處理,哪怕是過濾資料

datas =>

)f: (int, iterator[t]) => iterator[u],   //(分割槽號,迭代方式)

preservespartitioning: boolean = false): rdd[u]

將待處理的資料以分割槽為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處理,哪怕是過濾資料,在處理時同時可以獲取當前分割槽索引。

(index, datas) =>

)4)     flatmap

def flatmap[u: classtag](f: t => tr**ersableonce[u]): rdd[u]

將處理的資料先進行扁平化後再進行對映處理,所以運算元也稱之為扁平對映

val datardd = sparkcontext.makerdd(list(

list(1,2),list(3,4)

),1)

val datardd1 = datardd.flatmap(

list => list

)5)     glom

def glom(): rdd[array[t]]

將同乙個分割槽的資料直接轉換為相同型別的記憶體陣列進行處理,分割槽不變

val datardd = sparkcontext.makerdd(list(

1,2,3,4

),1)

val datardd1:rdd[array[int]] = datardd.glom()

6)     groupby

def groupby[k](f: t => k)(implicit kt: classtag[k]): rdd[(k, iterable[t])] 

//第乙個引數為分割槽規則,第二個引數預設的隱式轉換  

將資料根據指定的規則進行分組, 分割槽預設不變,但是資料會被打亂重新組合,我們將這樣的操作稱之為shuffle。極限情況下,資料可能被分在同乙個分割槽中

乙個組的資料在乙個分割槽中,但是並不是說乙個分割槽中只有乙個組

val datardd = sparkcontext.makerdd(list(1,2,3,4),1)

val datardd1 = datardd.groupby(

_%2)

7)     filter

def filter(f: t => boolean): rdd[t]

將資料根據指定的規則進行篩選過濾,符合規則的資料保留,不符合規則的資料丟棄。

當資料進行篩選過濾後,分割槽不變,但是分區內的資料可能不均衡,生產環境下,可能會出現資料傾斜。

val datardd = sparkcontext.makerdd(list(

1,2,3,4

),1)

val datardd1 = datardd.filter(_%2 == 0)

8)     sample

def sample(

withreplacement: boolean,  //是否放回

fraction: double,  //不放回情況為(0~1代表每個元素抽取機率)放回(每個元素期望的抽取次數)

seed: long = utils.random.nextlong): rdd[t] //隨機數種子,種子確定後每次抽取的元素固定

根據指定的規則從資料集中抽取資料

val datardd = sparkcontext.makerdd(list(

1,2,3,4

),1)

// 抽取資料不放回(伯努利演算法)

// 伯努利演算法:又叫0、1分布。例如扔硬幣,要麼正面,要麼反面。

// 具體實現:根據種子和隨機演算法算出乙個數和第二個引數設定機率比較,小於第二個引數要,大於不要

// 第乙個引數:抽取的資料是否放回,false:不放回

// 第二個引數:抽取的機率,範圍在[0,1]之間,0:全不取;1:全取;

// 第三個引數:隨機數種子

val datardd1 = datardd.sample(false, 0.5)

// 抽取資料放回(泊松演算法)

// 第乙個引數:抽取的資料是否放回,true:放回;false:不放回

// 第二個引數:重複資料的機率,範圍大於等於0.表示每乙個元素被期望抽取到的次數

// 第三個引數:隨機數種子

val datardd2 = datardd.sample(true, 2)

9)     distinct

def distinct()(implicit ord: ordering[t] = null): rdd[t]

def distinct(numpartitions: int)(implicit ord: ordering[t] = null): rdd[t] //去重後重新分割槽數

將資料集中重複的資料去重

val datardd = sparkcontext.makerdd(list(

1,2,3,4,1,2

),1)

val datardd1 = datardd.distinct()

val datardd2 = datardd.distinct(2)

10)   coalesce

def coalesce(numpartitions: int, shuffle: boolean = false, 

//第乙個引數為分割槽數,第二個引數是否進行shuffle操作  縮減分割槽使用預設值,擴大分割槽設定為true

partitioncoalescer: option[partitioncoalescer] = option.empty)

(implicit ord: ordering[t] = null)

: rdd[t]

根據資料量縮減分割槽,用於大資料集過濾後,提高小資料集的執行效率

當spark程式中,存在過多的小任務的時候,可以通過coalesce方法,收縮合併分割槽,減少分割槽的個數,減小任務排程成本

val datardd = sparkcontext.makerdd(list(

1,2,3,4,1,2

),6)

val datardd1 = datardd.coalesce(2)

思考乙個問題:我想要擴大分割槽,怎麼辦?

coalesce方法預設情況下無法擴大分割槽,因為預設不會將資料打亂重新組合。擴大分割槽是沒有意義。如果想要擴大分割槽,那麼必須使用shuffle,打亂資料,重新組合。

11)   repartition

def repartition(numpartitions: int)(implicit ord: ordering[t] = null): rdd[t]

該操作內部其實執行的是coalesce操作,引數shuffle的預設值為true。無論是將分割槽數多的rdd轉換為分割槽數少的rdd,還是將分割槽數少的rdd轉換為分割槽數多的rdd,repartition操作都可以完成,因為無論如何都會經shuffle過程。

val datardd = sparkcontext.makerdd(list(

1,2,3,4,1,2

),2)

val datardd1 = datardd.repartition(4)

思考乙個問題:coalesce和repartition區別?

repartition方法其實就是coalesce方法,只不過肯定使用了shuffle操作。讓資料更均衡一些,可以有效防止資料傾斜問題。

如果縮減分割槽,一般就採用coalesce, 如果擴大分割槽,就採用repartition

12)   sortby

def sortby[k](

f: (t) => k,  //函式處理函式

ascending: boolean = true,   //true為預設值公升序   false 降序

numpartitions: int = this.partitions.length)   //分割槽數

(implicit ord: ordering[k], ctag: classtag[k]): rdd[t]

該操作用於排序資料。在排序之前,可以將資料通過f函式進行處理,之後按照f函式處理的結果進行排序,預設為正序排列。排序後新產生的rdd的分割槽數與原rdd的分割槽數一致。

val datardd = sparkcontext.makerdd(list(

1,2,3,4,1,2

),2)

val datardd1 = datardd.sortby(num=>num, false, 4)

RDD的轉換運算元(Value型別)

value型別 map 每次處理一條資料。作用 將每乙個分割槽形成乙個陣列,形成新的rdd型別時rdd array t 需求 建立乙個4個分割槽的rdd,並將每個分割槽的資料放到乙個陣列 作用 分組,按照傳入函式的返回值進行分組。將相同的key對應的值放入乙個迭代器。需求 建立乙個rdd,按照元素模...

RDD運算元怎麼區分轉換運算元和行動運算元

textfile 既不是transformation 也不是 action 它是為生成rdd前做準備 運算元 指的就是rdd上的方法。spark中的運算元分為2類 1 轉換運算元 transformation 由rrd 呼叫方法 返回乙個新的rdd 一直存在drive中因為沒生成task 特點 生成...

RDD轉換運算元和行動運算元的區別

textfile 既不是transformation 也不是 action 它是為生成rdd前做準備 運算元 指的就是rdd上的方法。spark中的運算元分為2類 1 轉換運算元 transformation 由rrd 呼叫方法 返回乙個新的rdd 一直存在drive中因為沒生成task 特點 生成...