spark使用pandasdataframe優化

2021-12-30 12:29:17 字數 2517 閱讀 9508

pandas

spark

工作方式

單機,無法處理大量資料

分布式,能處理大量資料

儲存方式

單機快取

可以呼叫 persist/cache 分布式快取

是否可變

是 否

index索引

自動建立

無索引

行結構pandas.series

pyspark.sql.row

列結構pandas.series

pyspark.sql.column

允許列重名

否 是pandas dataframe 無法支援大量資料的計算,可以嘗試 spark df 來解決這個問題。

優化前import xgboost as xgb

import pandas as pd

import numpy as np

# 載入模型

bst = xgb.booster()

bst.load_model("***.model")

# 變數列表

var_list=[...]

df.rdd.map(lambda x : cal_xgb_score(x,var_list,ntree_limit=304)).write.todf()

# 計算分數

def cal_xgb_score(x,var_list,ntree_limit=50):

feature_count = len(var_list)

x1 = pd.dataframe(np.array(x).reshape(1,feature_count),columns=var_list)

# 資料變化操作

y1 = transformfun(x1)

test_x = xgb.dmatrix(y1.drop(['mobile','mobile_md5'],xais=1),missing=float('nan'))

y1['score'] = bst.predict(test_x,ntree_limit=ntree_limit)

y2 = y1[['mobile','mobile_md5','score']]

return 每條資料都轉化為 pd,增加了額外開銷。

優化後:def cal_xgb_score(x,var_list,ntree_limit=50):

feature_count = len(var_list)

//將 iterator 轉為list

x1 = pd.dataframe(list(x),columns=var_list)

...//將 pdf 轉為字典

return y1[['mobile','mobile_md5','score']].to_dict(orient='record')優化前:df.topandas()優化後:import pandas as pd

def _map_to_pandas(rdds):

return [pd.dataframe(list(rdds))]

def topandas(df, n_partitions=none):

if n_partitions is not none: df = df.repartition(n_partitions)

df_pand = df.rdd.mappartitions(_map_to_pandas).collect()

df_pand = pd.concat(df_pand)

df_pand.columns = df.columns

return df_pand

# 98列,22w行,型別 array/string/long/int,分割槽 200

df = spark.sql("...").sample(false,0.002)

df.cache()

df.count()

# 原生的 topandas 方法

%timeit df.topandas()

# 分布式的 topandas

%timeit topandas(df)

#使用 apache arrow,spark 版本2.3以上

spark.sql("set spark.sql.execution.arrow.enabled=true")

%timeit df.topandas()一. xgboost **

單條資料處理速度從 120 record / min 提高到 3278 record / min

tips: 如果乙個分割槽資料量過大將會導致 executor oom

二. spark dataframe 轉 pandas dataframe

type

cost (seconds)

native topandas

12 distributed topandas

5.91

arrow topandas

2.52

topandas 返回的資料歸根結底還是快取在 driver 的記憶體中的,不建議返回過大的資料。

Spark簡單使用

spark的乙個主要特點就是可以在記憶體中使用,因此他的計算速度比較快。在初學之前按照 quick start.html 中的示例來做一遍。先來初步理解一下操作流程。1.首先是搭建spark,網上有很多教程,cmd中最後執行pyspark 我們首先來分析spark資料夾中的 readme.md 檔案...

spark基本使用

啟動pysparkcd usr local spark bin pyspark統計文字的行數lines sc.textfile file usr local spark readme.md lines.count rdd的persisit方法會將該rdd物件持久化到記憶體中,對於可能會被重複呼叫的r...

spark使用parallelize方法建立RDD

通過呼叫sparkcontext的parallelize方法,在乙個已經存在的scala集合上建立的 乙個seq物件 集合的物件將會被拷貝,建立出乙個可以被並行操作的分布式資料集。python view plain copy data 1,2,3,4,5 distdata sc.paralleliz...