spark 讀取 hdfs 資料分割槽規則

2021-09-07 19:56:53 字數 1785 閱讀 5616

下文以讀取 parquet 檔案 / parquet hive table 為例:

hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertmetastoreparquet 控制,預設為 true。

如果設定為 true ,會使用 org.apache.spark.sql.execution.filesourcescanexec ,否則會使用 org.apache.spark.sql.hive.execution.hivetablescanexec。

filesourcescanexec

前者對分割槽規則做了一些優化,如果 檔案是:

沒有分桶的情況

分割槽大小計算公式:

bytespercore = totalbytes / defaultparallelism

maxsplitbytes = math.min(defaultmaxsplitbytes, math.max(opencostinbytes, bytespercore))12

defaultmaxsplitbytes:spark.sql.files.maxpartitionbytes,預設為128m,每個分割槽讀取的最大資料量

opencostinbytes: spark.sql.files.opencostinbytes,預設為4m,小於這個大小的檔案將會合併到乙個分割槽,可以理解為每個分割槽的最小量,避免碎檔案造成的大量碎片任務。

defaultparallelism: spark.default.parallelism,yarn預設為應用cores數量或2。

bytespercore:資料總大小 / defaultparallelism

eg. 讀入乙份 2048m 大小的資料

tip: partitionsize的計算過程簡化,實際上會先對讀入的每個分割槽按maxsplitbytes做切割,切割完後如果的小檔案如果大小不足maxsplitbytes的,會合併到乙個partition,直到大小 > maxsplitbytes。

//如果 spark.default.parallelism 設定為 1000,最終的分割槽數量是 512,每個分割槽大小為4m

maxsplitbytes = math.min(128m, math.max(4m, 2m))

partitionsize = 2048 / 4 = 512 

//如果 spark.default.parallelism 設定為 100, 最終的分割槽數量是 100,每個分割槽大小為20.48m

maxsplitbytes = math.min(128m, math.max(4m, 20.48m))

partitionsize = 2048 / 20.48 = 100     

//如果 spark.default.parallelism 設定為 10, 最終的分割槽數量是 16,每個分割槽大小為128m

maxsplitbytes = math.min(128m, math.max(4m, 204.8m))

partitionsize = 2048 / 128 = 16     12

3456

78910

11分桶的情況下:

分割槽數取決於桶的數量。

hivetablescanexec

通過檔案數量,大小進行分割槽。

eg. 讀入乙份 2048m 大小的資料,hdfs 塊大小設定為 128m

該目錄有1000個小檔案,則會生成1000個partition。

如果只有1個檔案,則會生成 16 個分割槽。

如果有乙個大檔案1024m,其餘 999 個檔案共 1024m,則會生成 1009 個分割槽

spark 讀取 hdfs 資料分割槽規則

下文以讀取 parquet 檔案 parquet hive table 為例 hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertmetastoreparquet 控制,預設為 true。如果設定為 true 會使用 org.apache.s...

Spark讀取hdfs分片資料原始碼剖析

val lines rdd string sparkcontext.textfile args 0 我們通過 sparkcontext.textfile來讀取hdfs中某一資料 正常來說,應該是有幾個檔案就建立幾個分割槽 然而我們dubug的時候發現,有時候3個檔案卻建立了4個分割槽 其實這是由於s...

python讀取hdfs資料

載入包from hdfs.client import client self.filename user hdfs read.txt 讀取hdfs檔案內容,將每行存入陣列返回def read hdfs file self with client.read samples.csv encoding u...