Spark入門RDD操作

2021-08-07 11:18:25 字數 3268 閱讀 5833

rdd(resilient distributed datasets),彈性分布式資料集,是spark中的抽象資料結構型別,任何資料在spark中都被表示為rdd.從程式設計的角度來看,rdd可以簡單看成是乙個陣列.和普通陣列的區別是,rdd中的資料是分割槽儲存的,這樣不同分割槽的資料就可以分布在不同的機器上,同時可以被並行處理。因此,spark應用程式所做的無非是把需要處理的資料轉換為rdd,然後對rdd進行一系列的變換和操作從而得到結果.spark簡單rdd入門操作。

啟動spark-shell

./bin/spark-shell //啟動乙個spark驅動器程式driver program。預設建立了乙個sparkcontext物件,是乙個sc的變數.

1.map

是對rdd中的每個元素都執行乙個指定的函式產生乙個新的rdd.任何原rdd中的元素在新rdd中都有且只有乙個元素與之對應.

(1)把原rdd中每個元素都乘以2來產生乙個新的rdd

scala> val a = sc.parallelize(1 to 9, 3)        

scala> val b = a.map(x => x*2) //只生成了1-9的九個數字

scala> a.collect

res10: array[int] = array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> b.collect

res11: array[int] = array(2, 4, 6, 8, 10, 12, 14, 16, 18)

(2)與上述的map方法的呼叫時相同的

scala> val textfile = spark.read

.textfile("readme.md")

scala> val textmap = textfile.map(line => line.split(" ").size)

2.reduce

是將rdd中元素兩兩傳遞給輸入函式,同時產生乙個新的值,新產生的值與rdd中下乙個元素再被傳遞給輸入函式直到最後只有乙個值為止.

2.1對rdd中的元素求和

scala> val c = sc.parallelize(1 to 10)

scala> c.reduce((x, y) => x + y)

res4: int = 55

scala> textfile.map

(line => line.split(" ").size).reduce

((a, b) => if (a > b) a else b)

3.map和reduce的使用

scala> textfile.map(line => line.split(" ").size).reduce

((a, b) => if (a > b) a else b)

res4: long = 15

4.flatmap

與map類似,區別是原rdd中的元素經map處理後只能生成乙個元素,而原rdd中的元素經flatmap處理後可生成多個元素來構建新rdd.

(1)對原rdd中的每個元素x產生y個元素

scala> val a = sc.parallelize(1

to4, 2)

scala> val b = a.flatmap(x => 1

to x) //生成了1和1,2和1,2,3和1,2,3,4十個數字

scala> b.collect

res12: array[int] = array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

呼叫flatmap將資料集轉換為字資料集,然後合併groupbykey並count計算檔案中的每字計數作為(string,long)對的資料集。要在shell中收集字數,我們可以呼叫collect.

scala> val wordcounts = textfile.flatmap(line => line.split(" ")).groupbykey(identity).count()

wordcounts: org.apache

.spark

.sql

.dataset[(string, long)] = [value: string, count(1): bigint]

scala> wordcounts.collect

res12: array[(string, long)] = array((online,1), (graphs,1), (["parallel,1), (["building,1), (thread,1), (documentation,3), (command,,2), (abbreviated,1),...)

5.標記快取

將我們的lineswithspark資料集標記為快取,當資料被重複訪問.

org.apache.spark.storage.storagelevel類的快取級別:

scala> lineswithspark.cache //cache 預設的快取級別與persist一致,都是storagelevel.memory_only,可以用unpersist來取消快取

res14: lineswithspark.type = [value: string]

scala> lineswithspark.count

res15: long = 20

scala> lineswithspark.count

res16: long = 20

6.spark rdd api 操作

6.1對鍊錶為list(1,2,3,3)的如下一元轉換操作

6.2對鍊錶為list(1,2,3)和list(3,4,5)的如下二元轉換操作

6.3對鍊錶為list(1,2,3,3)的如下行動操作

Spark 基礎及RDD基本操作

什麼是rdd rdd resilient distributed dataset 叫做分布式資料集,是spark中最基本的資料抽象,它代表乙個不可變 可分割槽 裡面的元素可平行計算的集合。rdd具有資料流模型的特點 自動容錯 位置感知性排程和可伸縮性。rdd允許使用者在執行多個查詢時顯式地將工作集快...

Spark程式設計模型 RDD

spark程式設計模型是彈性分布式資料集 resilient distributed dataset,rdd 是mapreduce模型的擴充套件和延伸 基於rdd機制實現了多類模型計算,如 1.迭代計算 2.互動式sql查詢 3.mapreduce rdd 4.流式資料處理。markdown 是一種...

spark 的RDD分割槽

rdd的倆種建立方 1.從集合中建立rdd,spark主要提供了兩種函式 parallelize和makerdd 使用parallelize 從集合建立 scala val rdd sc.parallelize array 1,2,3,4,5,6,7,8 使用makerdd 從集合建立 scala ...