spark中的轉換運算元1

2021-10-24 10:18:34 字數 4621 閱讀 9024

package test

import org.apache.spark.rdd.rdd

import org.apache.spark.

import scala.util.random

object testforyiele

//建立乙個rdd,型別是int

val listrdd: rdd[

int]

= context.makerdd(res)

//map運算元,例如在listrdd集合中每個元素的字尾加入字串abc

val maprdd: rdd[

string

]= listrdd.map(_ +

"abc"

) maprdd.collect(

).foreach(datas => print(datas +

"\t"))

}}

val listrdd: rdd[array[

any]

]= context.makerdd(array(array(

"a",1,

2,45)

,array(4,

7,18,

"oop"

,"om"))

)//注意型別 rdd[any]一維與rdd[array[any]]二維達到了降維效果,這運算元也叫扁平化操作

val flatmaprdd: rdd[

any]

= listrdd.flatmap(x=>x)

val listrdd: rdd[

any]

= context.makerdd(list(

"sbv"

,"om"

,"shuffle",3

,4))

// f: iterator[t] => iterator[u]

// 遍歷每乙個分割槽(將乙個乙個分割槽看作乙個整體進行邏輯處理)

// 效率比map運算元要快,隱患是可能會記憶體溢位

val partitionsrdd: rdd[

string

]"s"

))

//生成6個[0,9]之間的整數

var res =

for(i <-

1 to 6

)yield

val listrdd: rdd[

any]

= context.makerdd(res,2)

//f: (int, iterator[t]) => iterator[u],獲取資料在哪乙個分割槽,

//二元組,引數一是分割槽號(int型別),引數二是資料(iterator[int]型別,根據listrdd的rdd型別推斷

val partitionswithindexrdd: rdd[

(any

,string)]

case

(nums, datas)

=>

datas.map(

(_,"分割槽:"

+ nums)

)}

輸出:

(7,分割槽:0)

(9,分割槽:0)

(1,分割槽:0)

(0,分割槽:1)

(5,分割槽:1)

(4,分割槽:1)

4)glom

glom函式將每個分割槽形成乙個陣列,內部實現是返回的glommedrdd。

//生成20個[0,9]之間的整數

var res =

for(i <-

1 to 20

)yield

val listrdd: rdd[

int]

= context.makerdd(res)

//將乙個分割槽的資料放入到array陣列中

val glomrdd: rdd[array[

int]

]= listrdd.glom(

)glomrdd.collect(

).foreach(datas=>println(datas.mkstring(

",")

))

輸出:

8,31,2,8

1,00,2,1

5,09,4,3

2,76,5,2

7) groupby

groupby :將元素通過函式生成相應的 key,資料就轉化為 key-value 格式,之後將 key 相同的元素分為一組。

//隨機生成30個[0-99]之間的整數並以vector集合型別進行返回給res

val res =

for(i <-

1 to 30

)yield

val listrdd: rdd[

int]

= context.makerdd(res)

//按照指定的規則進行分組,將listrdd集合中每乙個元素模以5,結果相同則視為乙個key

//返回乙個二元組 int 型別指的是返回模以5的結果,iterable[int])指的是返回相同key(模以5的結果)的集合

val grouprdd: rdd[

(int

, iterable[

int])]

= listrdd.groupby(x => x %5)

grouprdd.collect(

).foreach(println)

輸出:

(0,compactbuffer(15, 85, 70))

(1,compactbuffer(41, 6, 31, 41, 96, 46))

(2,compactbuffer(37, 7, 82, 37, 17, 72, 57))

(3,compactbuffer(8, 98, 98, 78, 8))

(4,compactbuffer(54, 34, 4, 24, 54, 59, 44, 44, 94))

8) filter

filter 函式功能是對元素進行過濾,對每個 元 素 應 用 f 函 數, 返 回 值 為 true 的 元 素 在rdd 中保留,返回值為 false 的元素將被過濾掉。 內 部 實 現 相 當 於 生 成 filteredrdd(this,sc.clean(f))

val listrdd: rdd[string] = context.makerdd(array(「tom」,「lisa」,「python」,「sparck」,「scala」))

//將listrdd集合中包含s字元取出,其他過濾掉

val filterrdd: rdd[string] = listrdd.filter(x => x.contains(「s」))

filterrdd.collect().foreach(println)

輸出:sparck

scala

9)distinct

distinct將rdd中的元素進行去重操作.

val listrdd: rdd[

any]

= context.makerdd(array(2,

1,3,

2,1,

"tom"

,"python"

,"python"))

val distinctrdd: rdd[

any]

= listrdd.distinct(

) distinctrdd.collect(

).foreach(println)

輸出:1

tom2

3python

結果貌似與我們想得不一樣,distinct操作有shuffle的讀寫操作,那就有資料重組(洗牌)操作,實際上我們有這個轉換操作時,它必須等待所有分割槽的資料全部讀完,才會進行這個轉換操作,不同於其他轉換運算元,map運算元不需要等待下乙個分割槽,可以這樣說map運算元有並行機制,但distinct運算元沒有並行機制(我們也必須等待所有分割槽的資料讀完,才能去重)

11) sample

sample 將 rdd 這個集合內的元素進行取樣,獲取所有元素的子集。使用者可以設定是否有放回的抽樣、百分比、隨機種子,進而決定取樣方式。內部實現是生成 sampledrdd(withreplacement, fraction, seed)。

val listrdd: rdd[

int]

= context.makerdd(

1 to 20

)//隨機抽樣,引數一,是否放回資料,true放回,false不放回

//fraction引數並不是總體的百分比量,但它的引數越大,抽樣數量越大,

//seed表示乙個種子(這點可以再次執行會和上一次執行結

//果相同,執行幾次也是一樣,偽隨機數)

val samplerdd: rdd[

int]

= listrdd.sample(

false

,0.5,5

)

sparkStreaming轉換運算元

map 集群 nc 埠 9000 可以修改 替換 字 flatmap 切分壓平 filter repartition union合併 local 當只有兩個的時候 只有乙個分割槽 另乙個處理資料集 count reduce join 和 cogroup用兩個佇列join 以上運算元都是無狀態的 各處...

RDD轉換運算元和行動運算元的區別

textfile 既不是transformation 也不是 action 它是為生成rdd前做準備 運算元 指的就是rdd上的方法。spark中的運算元分為2類 1 轉換運算元 transformation 由rrd 呼叫方法 返回乙個新的rdd 一直存在drive中因為沒生成task 特點 生成...

RDD運算元怎麼區分轉換運算元和行動運算元

textfile 既不是transformation 也不是 action 它是為生成rdd前做準備 運算元 指的就是rdd上的方法。spark中的運算元分為2類 1 轉換運算元 transformation 由rrd 呼叫方法 返回乙個新的rdd 一直存在drive中因為沒生成task 特點 生成...