spark 示例 連線操作

2022-05-22 22:51:10 字數 1782 閱讀 6706

我們有這樣兩個檔案

任務:找出使用者評分平均值大於4的電影。

我們看兩個檔案結果,第乙個檔案有電影的id和名字,第二個檔案有電影的id和所有使用者的評分

對於任務結果所需要的資料為電影id,電影名字,平均評分。平均評分用所有使用者評分總和/使用者數來求出

1.我們先計算電影的評分

(1)先讀取電影評分檔案

(2)取資料

我們看到每行的資料是通過::來進行連線的,然後我們需要的是第二列的電影id以及第二列的評分。

我們把兩個有用的資料取出來,組成鍵值對的形式。

為什麼要組成鍵值對的形式?

資料中每個使用者的對電影的評分都是分開的,所以我們需要對電影id進行分組操作,把所有評分分組。

之前示例中我們知道groupbykey能進行分組,同時還能把所有相同key的資料組合成乙個集合。

當我們把所有資料集合之後就很容易操作計算了。

所以我們把資料組合成為《電影id,評分》這樣的鍵值對的形式。

3.分組計算平均評分

我們看到我們分組之後,所有相同電影的不同使用者的評分都被收集到了乙個集合中。

那麼如何計算平均評分呢?評分總分 / 評分個數 = 平均評分

scala集合提供了sum方法來可以計算集合總和,提供了size方法來計算資料條數。

正好不用我們額外去求了,如果集合沒有定義方法,我們也可以遍歷後計算得出要求的值。

2.在取電影id和電影名

我們檢視資料結構,資料是通過::連線的,對我們有用的資料為第一列電影id和第二列電影名稱

3.通過電影id連線

我們把我們所有需要的資料都取出來了,接下來進行連線就可以了。

但是,我們連線需要把電影id作為連線的key。

我們需要的結果為(id,name,score)

如果我們直接對id進行連線的話,我們連線出來的結果只有(name,score)缺少了id

所以我們需要再次對資料進行處理,我們通過.keyby()方法新生成乙個key,同時value為原始的資料

然後我們再進行連線操作,注意join連線操作是內鏈結,

連線後的key是連線鍵,value為所有相同key的集合,可以通過_2._x來進行訪問

4.過濾求出平均評分大於4的記錄

Spark 傾斜連線

資料傾斜出現的原因 平行計算中,我們總希望分配的每乙個任務 task 都能以相似的粒度來切分,且完成時間相差不大。但是由於集群中的硬體和應用的型別不同 切分的資料大小不一,總會導致部分任務極大地拖慢了整個任務的完成時間,資料傾斜原因如下 資料傾斜的表現 任務進度長時間維持,檢視任務監控頁面,由於其處...

使用Python寫spark 示例

python寫spark我認為唯一的理由就是 你要做資料探勘,ai相關的工作。因為很多做數挖的他們的基礎語言都是python,他們如果重新學scala比較耗時,而且,python他的強大類庫是他的優勢,很多演算法庫只有python有。python的安裝 解壓python包,在環境變數裡面配上bin的...

Spark操作 控制操作

cache和persist操作都是對rdd進行持久化,其中cache是persist採用memory only儲存級別時的乙個特例,scala var rdd sc.textfile users lyf desktop data.txt scala rdd.cache 第一次計算行數,這裡只能從本地...