Spark core詳解系列四

2021-10-01 05:18:40 字數 2911 閱讀 8169

要求:資料如下

a,1,

3a,2,

4b,1,

1

根據資料第一列統計得到如下結果

a,3,

7b,1,

1

用rdd實現。

實現功能核心**如下:

val input = sc.

parallelize

(list

(list

("a",1

,3),

list

("a",2

,4),

list

("b",1

,1))

)input.

map(x =

>).

reducebykey

((x, y)

=>).

map(x =

>

(x._1, x._2._1, x._2._2)

).collect.

foreach

(println)

要求:資料如下

"1000000,一起看|電視劇集|軍旅|士兵突擊,1,0"

,"1000000,一起看|電視劇集|軍旅|士兵突擊,1,1"

,"1000001,一起看|電視劇集|軍旅|我的團長我的團,1,1"

,

統計得到如下結果:

(

(1000000

,一起看),(

2,1)

)((1000000

,電視劇集),(

2,1)

)((1000000

,軍旅),(

2,1)

)((1000000

,士兵突擊),(

2,1)

)((1000001

,一起看),(

1,1)

)((1000001

,電視劇集),(

1,1)

)((1000001

,軍旅),(

1,1)

)((1000001

,我的團長我的團),(

1,1)

)

實現功能核心**如下:

val input = sc.

parallelize

(list

("1000000,一起看|電視劇集|軍旅|士兵突擊,1,0"

,"1000000,一起看|電視劇集|軍旅|士兵突擊,1,1"

,"1000001,一起看|電視劇集|軍旅|我的團長我的團,1,1"))

val processrdd = input.

flatmap

(x =

>

)processrdd.

reducebykey

((x, y)

=>

(x._1 + y._1, x._2 + y._2)

).collect.

foreach

(println)

processrdd.

groupbykey()

.mapvalues

(x =

>

).collect.

foreach

(println)

補充知識點

reducebykey和groupbykey對比兩個都是shuffle運算元,但reducebykey有預聚合功能,所以reducebykey的shuffle資料傳輸更小,生產上優先選擇reducebykey。兩者的分割槽器預設都是 hashpartitioner,分割槽數預設就是並行度。

要求:求組內 top n

日誌檔案資料形如:www.baidu.com,url1

思路一:利用 tolist 然後排序,take出topn,但是tolist存在安全隱患,適合小資料量。

val processrdd = input.

map(x =

>

)processrdd.

reducebykey

(_+_)

.groupby

(_._1._1)

.mapvalues

(x=>).

map(x=

>

(x._1,x._2(

0),x._2(1

))).

printinfo

()

思路二:分而治之

val sites = processrdd.

map(_._1._1)

.distinct()

.collect()

// collect將rdd轉化為陣列,因為兩個rdd不能巢狀

sites.

foreach

(x=>

)

補充知識點

1.rdd不支援巢狀,即不能使用如:rdd1.map(x => rdd2.values.count() * x)

解決辦法:將其中乙個rdd轉化成陣列,如使用 rdd1.collect(),但是collect運算元只適用於小資料量。

2.foreach運算元和map運算元區別:map返回新的rdd,且是transformation運算元;foreach無返回值,且是action運算元。

3.take是action運算元,返回結果是乙個array,不需要collect,直接可以 foreach(println)。

Spark core詳解系列二

collect 把rdd中所有元素返回到乙個陣列,返回到driver端的memory中。如非要檢視rdd中的資料 取出部分資料,或把rdd輸出到檔案系統。foreach rdd.foreach println rdd.foreachpartition partition partition.map ...

Solrj Java API呼叫詳解系列(四)

高階查詢部分的內容其實還有很多,詳細請參考 一 過濾查詢字段 有時查詢會的docs中可能不需要包含所有的field,只需要關注其中的幾個。那麼通過fl field list 引數的設定可以實現field的過濾。例如 solrquery query new solrquery query.setfie...

View系列 四 Layout 流程詳解

三 viewgroup 的 layout 過程 四 小結 在 view 的 measure 過程中,measure 分為兩種場景 即單一view 和 viewgroup 而 view 的 layout 過程與 measure 類似,也分為以下兩種場景。view的型別 layout 過程 單一的vie...