Spark常用函式(原始碼閱讀六)

2022-07-08 16:36:23 字數 2355 閱讀 4009

原始碼層面整理下我們常用的操作rdd資料處理與分析的函式,從而能更好的應用於工作中。

連線hbase,讀取hbase的過程,首先**如下:

def tableinitbytime(sc : sparkcontext,tablename : string,columns : string,fromdate: date,todate : date) : rdd[(immutablebyteswritable,result)] =

val hbaserdd =sc.newapihadooprdd(configuration,classof[tableinputformat],classof[org.apache.hadoop.hbase.io.immutablebyteswritable],classof[org.apache.hadoop.hbase.client.result])

system.out.println(hbaserdd.count())

hbaserdd

}

我們來一點一點解析整個過程。

1、val configuration = hbaseconfiguration.create()

這個用過hbase的夥伴們都知道,載入配置檔案,其實呼叫的是hbase的api,返回的rdd是個configuration。載入的配置檔案資訊包含core-default.xml,core-site.xml,mapred-default.xml等。載入原始碼如下:

2、隨之設定表名資訊,並宣告scan物件,並且set讀取的列有哪些,隨後呼叫newapihadooprdd,載入指定hbase的資料,當然你可以加上各種filter。那麼下來 我們看看newapihadooprdd是幹了什麼呢?我們來閱讀下裡面的實現。

可以看到我們呼叫api,其實就是乙個input過程,建立了乙個newhadooprdd物件,那麼後台是乙個input資料隨後轉化為rdd的過程。節點之間的資料傳輸是通過序列化資料,通過broadcast傳輸的conf資訊。

3、隨之進行count驗證操作,查詢資料的partition個數,hbase的資料當然是以block塊的形式儲存於hdfs。

5、下來我們看下filter函式幹了什麼呢?

val calculaterdd = transrdd.filter(_._1 != null).filter(_._2 != null).filter(_._3 != null).filter(_._4 !=null

)

//map轉換為字段((身份證號,經度(保留兩位小數),緯度(保留兩位小數),**號碼,時間段標誌),1),最後的1代表出現一次,用於後邊做累加

6、隨後我們要進行相同key值的合併,那麼,我們開始使用reducebykey:

//

按key做reduce,value做累加

底層呼叫了combinebykeywithclasstag,這裡的partitioner引數我們之所以沒有傳入,是因為在map的rdd中已包含該rdd的partitioner的資訊。它內部的實現將map的結果呼叫了require先進行merge,隨後建立shufflerdd.shufflerdd就是最終reduce後的rdd。然後看不懂了。。。因為需要與整個流程相結合。所以後續繼續深入~

Spark常用函式(原始碼閱讀六)

原始碼層面整理下我們常用的操作rdd資料處理與分析的函式,從而能更好的應用於工作中。連線hbase,讀取hbase的過程,首先 如下 def tableinitbytime sc sparkcontext,tablename string,columns string,fromdate date,t...

spark原始碼剖析 RDD相關原始碼閱讀筆記

最好的原始碼閱讀方法就是除錯,沒有之一 之前其實有閱讀過rdd相關的原始碼,最近學習過程中發現在之前原本閱讀過的模組中有一些 關節 並沒有打通,所以想通過除錯的方式來更細緻得學習原始碼。本文為編寫測試用例並除錯rdd相關模組的筆記,並沒有列出具體的除錯過程,僅列出結論以做備忘,特別是那些比較容易忽略...

Spark原始碼閱讀之HistoryServer

概述 historyserver服務可以讓使用者通過spark ui介面,檢視歷史應用 已經執行完的應用 的執行細節,比如job資訊 stage資訊 task資訊等,該功能是基於spark eventlogs日誌檔案的,所以必須開啟eventlogs日誌開關,關於日誌開關的開啟和historyser...