Spark操作 轉換操作 一

2021-10-06 04:37:15 字數 4061 閱讀 3366

基礎轉換操作

鍵值轉換操作

對rdd中的每個元素都應用乙個指定的函式,以此產生乙個新的rdd

scala> var rdd = sc.textfile("/users/lyf/desktop/test/data1.txt")

scala> rdd.map(line => line.split(" ")).collect

res16: array[array[string]] = array(array(hello, world), array(hello, tom), array(hello, jerry))

去除rdd中重複的元素,返回所有元素不重複的rdd

scala> var rdd = sc.parallelize(list(1,2,2,3,3,3,4,5))

rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[15] at parallelize at :24

scala> rdd.distinct.collect

res18: array[int] = array(4, 1, 5, 2, 3)

scala> var rdd = sc.parallelize(list(1,2,2,3,3,3,4,5))

rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[15] at parallelize at :24

scala> var rdddistinct = rdd.distinct

scala> rdddistinct.partitions.size

res21: int = 4

scala> var rdddistinct = rdd.distinct(3)

scala> rdddistinct.partitions.size

res22: int = 3

scala> var rdd = sc.textfile("/users/lyf/desktop/test/data1.txt")

scala> rdd.flatmap(line => line.split(" ")).collect

res23: array[string] = array(hello, world, hello, tom, hello, jerry)

兩者都是對rdd進行重新分割槽。coalesce使用hashpartitioner進行分割槽,第乙個引數為重分割槽數,第二個為是否進行shuffle,預設為false。repartition是coalesce操作shuffle為true的封裝。

scala> var rdd = sc.textfile("/users/lyf/desktop/test/data1.txt")

scala> rdd.partitions.size

res24: int = 2

scala> var rdd_1 = rdd.coalesce(1)

rdd_1: org.apache.spark.rdd.rdd[string] = coalescedrdd[36] at coalesce at :25

// 如果分割槽數大於原來的分割槽數,則第二個引數必須要true,否則分割槽數不變

scala> var rdd_2 = rdd.coalesce(3)

rdd_2: org.apache.spark.rdd.rdd[string] = coalescedrdd[37] at coalesce at :25

scala> rdd_2.partitions.size

res26: int = 2

scala> var rdd_2 = rdd.coalesce(5, true)

res37: array[array[int]] = array(array(1, 2), array(3, 4, 5), array(6, 7), array(8, 9, 10))

scala> rdd_2.partitions.size

res28: int = 5

scala> var rdd_3 = rdd.repartition(5)

scala> rdd_3.partitions.size

res29: int = 5

根據weights權重將乙個rdd分割為多個rdd,組成rdd陣列,權重越高,被劃分到的概率就越大。

scala> var rdd = sc.parallelize(1 to 10, 10)

rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[47] at parallelize at :24

// 將原rdd按照weights權重生成乙個新的rdd陣列

scala> var rddsplit = rdd.randomsplit(array(1.0, 2.0, 3.0, 4.0))

scala> rddsplit.size

res30: int = 4

scala> rddsplit(0).collect

res31: array[int] = array()

scala> rddsplit(1).collect

res32: array[int] = array(3, 8)

scala> rddsplit(2).collect

res33: array[int] = array(1, 2, 9)

scala> rddsplit(3).collect

res34: array[int] = array(4, 5, 6, 7, 10)

將rdd中每乙個分割槽中所有型別為t的資料轉變為元素型別為t的陣列[array[t]]

scala> var rdd = sc.parallelize(1 to 10, 4)

rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[52] at parallelize at :24

scala> rdd.collect

res36: array[int] = array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> rdd.glom().collect

res37: array[array[int]] = array(array(1, 2), array(3, 4, 5), array(6, 7), array(8, 9, 10))

返回兩個rdd的並集,元素不進行去重

scala> var rdd1 = sc.makerdd(1 to 3, 1)

rdd1: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[54] at makerdd at :24

scala> var rdd2 = sc.makerdd(2 to 5, 1)

rdd2: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[55] at makerdd at :24

scala> rdd1.union(rdd2).collect

res38: array[int] = array(1, 2, 3, 2, 3, 4, 5)

返回兩個rdd的交集,元素不進行不去重。引數numpartitions指定分割槽數,引數partitioner指定分割槽函式

scala> rdd1.intersection(rdd2).collect

res39: array[int] = array(3, 2)

返回兩個rdd的差集,元素不進行去重

scala> rdd1.subtract(rdd2).collect

res40: array[int] = array(1)

參考:

[1] 郭景瞻. **spark:核心技術與案例實戰[m]. 北京:電子工業出版社, 2017.

Spark操作 行動操作 一

scala var rdd sc.makerdd array a 1 a 2 a 3 b 4 b 5 c 6 c 7 c 8 c 9 d 10 rdd org.apache.spark.rdd.rdd string,int parallelcollectionrdd 60 at makerdd at...

spark鍵值對轉換操作例子

題目 給定一組鍵值對 spark 2 hadoop 6 hadoop 4 spark 6 鍵值對的key表示圖書名稱,value表示某天圖書銷量,請計算每個鍵對應的平均值,也就是計算每種圖書的每天平均銷量。很顯然,對於上面的題目,結果是很顯然的,spark 4 hadoop 5 package cn...

Spark操作 控制操作

cache和persist操作都是對rdd進行持久化,其中cache是persist採用memory only儲存級別時的乙個特例,scala var rdd sc.textfile users lyf desktop data.txt scala rdd.cache 第一次計算行數,這裡只能從本地...