spark讀取MySQL的方式及併發度優化

2021-09-25 01:45:35 字數 3209 閱讀 1772

前段時間用sparksession讀取mysql的乙個表的時候,出現耗時長,頻繁出現oom等情況,去網上查詢了一下,是因為用的預設讀取jdbc方式,單執行緒任務重,所以出現耗時長,oom等現象.這時候需要提高讀取的併發度.現簡單記錄下.

看sparsession dataframereader原始碼,讀取jdbc有三個方法過載.

單partition,無併發def jdbc(url: string, table: string, properties: properties): dataframe

使用,校驗

val url: string = "jdbc:mysql://localhost:3306/testdb"

val table = "students"

//連線引數

val properties: properties = new properties()

properties.setproperty("username","root")

properties.setproperty("password","123456")

properties.setproperty("driver","com.mysql.jdbc.driver")

val tb_table: dataframe = sparksession.read.jdbc(url, table, properties)

檢視併發度tb_table.rdd.getnumpartitions #返回結果1

該操作的併發度為1,你所有的資料都會在乙個partition中進行操作,意味著無論你給的資源有多少,只有乙個task會執行任務,執行效率可想而之,並且在稍微大點的表中進行操作分分鐘就會oom.

2. 根據long型別字段分割槽

呼叫函式

def jdbc(

url: string,

table: string,

columnname: string, # 根據該欄位分割槽,需要為整形,比如id等

lowerbound: long, # 分割槽的下界

upperbound: long, # 分割槽的上界

numpartitions: int, # 分割槽的個數

connectionproperties: properties): dataframe

使用,校驗

val url: string = "jdbc:mysql://localhost:3306/testdb"

val table = "students"

val colname: string = "id"

val lowerbound = 1

val upperbound = 10000

val numpartions = 10

val properties: properties = new properties()

properties.setproperty("username","root")

properties.setproperty("password","123456")

properties.setproperty("driver","com.mysql.jdbc.driver")

val tb_table: dataframe = sparksession.read.jdbc(url, table,colname, lowerbound, upperbound, numpartions, properties)

檢視併發度tb_table.rdd.getnumpartitions #返回結果10

該操作將字段 colname 中1-10000000條資料分到10個partition中,使用很方便,缺點也很明顯,只能使用整形資料字段作為分割槽關鍵字

3. 根據任意型別字段分割槽

呼叫函式

jdbc(

url: string,

table: string,

predicates: array[string],

connectionproperties: properties): dataframe

使用,校驗

val url: string = "jdbc:mysql://localhost:3306/testdb"

val table = "students"

/*** 將9月16-12月15三個月的資料取出,按時間分為6個partition

* 為了減少事例**,這裡的時間都是寫死的

* sbirthday 為時間字段

*/ val predicates =

array(

"2015-09-16" -> "2015-09-30",

"2015-10-01" -> "2015-10-15",

"2015-10-16" -> "2015-10-31",

"2015-11-01" -> "2015-11-14",

"2015-11-15" -> "2015-11-30",

"2015-12-01" -> "2015-12-15"

).map

val properties: properties = new properties()

properties.setproperty("username","root")

properties.setproperty("password","123456")

properties.setproperty("driver","com.mysql.jdbc.driver")

val tb_table: dataframe = sparksession.read.jdbc(url, table,predicates, properties)

檢視併發度tb_table.rdd.getnumpartitions #結果為6

該操作的每個分割槽資料都由該段時間的分割槽組成,這種方式適合各種場景,較為推薦。

mysql單partition,大表極容易出現卡死n分鐘oom情況.

分成多個partition後,已極大情況避免該情況發生,但是partition設定過高,大量partition同時讀取資料庫,也可能將資料庫弄掛,需要注意.

參考: spark jdbc(mysql) 讀取併發度優化

Spark讀取檔案

spark預設讀取的是hdfs上的檔案。如果讀取本地檔案,則需要加file usr local spark readme.md。測試時候發現,本地檔案必須在spark的安裝路徑內部或者平行 讀取hdfs檔案,可以這樣指定路徑 hdfs ns1 tmp test.txt。如果不指定任何字首,則使用hd...

Spark讀取檔案

spark預設讀取的是hdfs上的檔案。如果讀取本地檔案,則需要加file usr local spark readme.md。測試時候發現,本地檔案必須在spark的安裝路徑內部或者平行 讀取hdfs檔案,可以這樣指定路徑 hdfs ns1 tmp test.txt。如果不指定任何字首,則使用hd...

spark讀取json,parquet檔案

spark支援的一些常見的格式 文字檔案,無任何的格式 json檔案,半結構化 parquet,一種流行的列式儲存格式 sequencefile,一種用於key value的hadoop檔案格式,如果需要讀hdfs資料的話,通常走hive的比較多。在企業中很少用,原因是寫sql的時候,能用spark...