零基礎入門大資料之spark中rdd部分運算元詳解

2021-09-02 02:35:24 字數 3341 閱讀 6718

我們知道,spark中乙個重要的資料結構是rdd,這是一種並行集合的資料格式,大多數操作都是圍繞著rdd來的,rdd裡面擁有眾多的方法可以呼叫從而實現各種各樣的功能,那麼通常情況下我們讀入的資料來源並非rdd格式的,如何轉換為rdd呢?

乙個基本的方法是初始化,或者格式化操作函式parallelize。

比如乙個陣列array(1,2,3,4,5),經過parallelize後就變成了rdd格式的陣列。

scala> val d = array(1,2,3,4,5)

d: array[int] = array(1, 2, 3, 4, 5)

scala> val rdd_d = sc.parallelize(d)

rdd_d: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[2] at parallelize at :27

rdd的初始化parallelize函式可以接受兩個引數,上面省略了第二個預設引數,這個引數是分片個數slices,表示資料集切分的份數。乙份資料存在某台機器上的時候,可以指定切分的份數,可以想象,切分的越多,處理起來因為是並行的,速度越快,當然這是要佔資源的,當資料小的時候完全沒必要。典型地,你可以在集群的每個cpu上分布2-4個slices. 一般來說,spark會嘗試根據集群的狀況,來自動設定slices的數目。也可以通過傳遞給parallelize的第二個引數來進行手動設定,例如:sc.parallelize(data, 10))。

再來看乙個稍複雜的函式:aggregate。這是乙個比較底層的使用也很廣泛的函式,字面意思就是聚合的意思。不過它有兩層聚集:第一層,對分片的資料進行聚集,這裡的片就是上面的slices,第二層,對聚集的結果再進行聚集。看一下函式原型:

def aggregate[u: classtag](zerovalue: u)(seqop: (u, t) => u, combop: (u, u) => u): u)
這個函式需要初始化乙個值zerovalue。第一層聚合seqop,第二層聚合combop。

看乙個例子:

scala> val d = array(1,2,3,4,5,6)

scala> val rdd_d = sc.parallelize(d,2)

scala> val res = rdd_d.aggregate(0)(math.max(_,_),_+_)

res: int = 9

解釋一下:

1)首先將一組陣列分成兩片儲存起來:parallelize(d,2);預設是平均分,也就是1,2,3一片,4,5,6一片

2)對每一片執行取最大值操作,因為是聚合函式,所以輸入是兩個,輸出是乙個的函式,math.max(_,_)就是這種,下劃線代表所有元素執行。

可以看到這一步過後每一片的結果就是3,6。

3)之後對3,6執行相加的操作得到9.

一般來說這兩個函式設定成一樣的,比如,就是取最大值,那麼第二個函式也可以變成math.max(_,_)

cache是將rdd的結果暫時存放在記憶體裡面,方便後面需要用到這個rdd的時候不用再計算。

我們知道,spark裡面包含兩種運算運算元,一種是轉換運算元,一種是行為運算元,轉換運算元再spark裡面是惰性計算的,什麼意思呢?就是你寫了**,但是實際上程式執行到這一步並沒有實際發生計算,只有碰到了行為運算元才算正兒八經的計算。轉換運算元,就好比畫了計算流程圖一樣,只是單純的框架而已。這讓我想起了tensorflow裡面的一些運算也是這樣,果然都是一家的程式設計師,思路都完美繼承。

說遠了,這和cache運算元有什麼關係呢?舉個例子,假設資料a經過三層map變成了d,也就是a->b->c->d,a到d之間都是轉換操作,這個時候如果某個計算需要用到d,那麼就會把a到d的過程重新走一遍,因為d在一次計算以後不會儲存在記憶體裡面的,這就導致了乙個嚴重的問題就是需要重複計算很多東西。如果d被程式在不同地方多次呼叫的話將帶來效能的下降。這個時候有沒有辦法把d第一次被計算的結果儲存起來呢,有,這就是cache方法,起到快取的作用。

什麼是笛卡爾操作?就是兩個集合中的元素分別兩兩排列組合,舉個例子:

val s1 = sc.parallelize(array(1,2,3,4,5))

val s2 = sc.parallelize(array("a","b","c"))

val res = s1.cartesian(s2)

res.collect()

>>> array[(int, string)] = array((1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c), (4,a), (4,b), (4,c), (5,a), (5,b), (5,c))

看例子非常容易理解。笛卡爾操作在某些時候還是很有效的。

顧名思義,去掉rdd中重疊的元素。實際中我經常也會用到,比如想把幾個rdd進行合併起來,可以用union方法,但是呢裡面會有重複的,這個時候再接這個函式即可。如下:

val s1 = sc.parallelize(array(1,2,3,4,5))

val s2 = sc.parallelize(array(4,5,6,7,8))

val res = s1.union(s2).distinct()

res.collect()

>>> array[int] = array(1, 2, 3, 4, 5, 6, 7, 8)

這個方法可以說非常的重要了,靈活運用可以實現非常多的功能。filter方法就是過濾掉rdd中滿足一定條件的rdd,filter函式裡面可以定義各種過濾函式。比如下面:

val s1 = sc.parallelize(array(1,2,3,4,5))

val res = s1.filter(x => x>3)

res.collect()

>>> array[int] = array(4, 5)

keyby方法是為rdd中的每個資料額外增加乙個key構成(key,value)的資料對,進而這種結構的資料可以使用(key,value)專門的一些聚合函式,這些函式在以前的文章中記錄過。keyby的方法也比較簡單,舉個例子就明白了:

val s1 = sc.parallelize(array(1,2,3,4,5))

val res = s1.keyby(x => x*x)

res.collect()

>>> array[(int, int)] = array((1,1), (4,2), (9,3), (16,4), (25,5))

這裡將rdd中每個數的平方值當作自己的key,生成的結果可以看到。所以keyby只是生成對應元素key。

本文暫時記錄這麼多方法吧。

大資料零基礎入門學習之Hadoop技術優缺點

hadoop的優點 1 hadoop具有按位儲存和處理資料能力的高可靠性。2 hadoop通過可用的計算機集群分配資料,完成儲存和計算任務,這些集群可以方便地擴充套件到數以千計的節點中,具有高擴充套件性。3 hadoop能夠在節點之間進行動態地移動資料,並保證各個節點的動態平衡,處理速度非常快,具有...

大資料零基礎入門學習之Hadoop技術優缺點

hadoop的優點 1 hadoop具有按位儲存和處理資料能力的高可靠性。2 hadoop通過可用的計算機集群分配資料,完成儲存和計算任務,這些集群可以方便地擴充套件到數以千計的節點中,具有高擴充套件性。3 hadoop能夠在節點之間進行動態地移動資料,並保證各個節點的動態平衡,處理速度非常快,具有...

Python零基礎入門之函式

函式的命名空間和作用域 函式的三類命名空間 內建 全域性 區域性 兩大作用域 全域性 內建和全域性命名空間都屬於全域性作用域 區域性 區域性命名空間屬於區域性作用域 什麼是作用域鏈?就是由外而內的命名空間中的中的變數的生存週期都是就近原則 全域性作用域 大區域性作用域 小區域性作用域 函式的兩大引數...