spark分布式執行xgboost

2021-10-23 09:06:09 字數 4358 閱讀 2134

# coding=utf-8

import os

os.environ[

'pyspark_submit_args']=

'--jars /data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar,/data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar pyspark-shell'

from pyspark.sql import sparksession, sqlcontext

from pyspark import sparkconf, sparkcontext

conf = sparkconf(

).setmaster(

"yarn"

)"pyspark_xgboost_yarn"

)sc = sparkcontext(conf=conf)

'calculatinggeodistances'

).getorcreate(

)sqlcontext = sqlcontext(sparkcontext=sc)

from pyspark.sql.types import

*from pyspark.ml.feature import stringindexer, vectorassembler

from pyspark.ml import pipeline

# spark.sparkcontext.addpyfile("hdfs:///tmp/rd/lp/sparkxgb.zip")

schema = structtype(

[structfield(

"passengerid"

, doubletype())

, structfield(

"survived"

, doubletype())

, structfield(

"pclass"

, doubletype())

, structfield(

"name"

, stringtype())

, structfield(

"***"

, stringtype())

, structfield(

"age"

, doubletype())

, structfield(

"sibsp"

, doubletype())

, structfield(

"parch"

, doubletype())

, structfield(

"ticket"

, stringtype())

, structfield(

"fare"

, doubletype())

, structfield(

"cabin"

, stringtype())

, structfield(

"embarked"

, stringtype())

])df_raw = spark \

.read \

.option(

"header"

,"true"

) \ .schema(schema) \

.csv(

"train.csv"

)df_raw.show(20)

df = df_raw.na.fill(0)

vectorassembler = vectorassembler(

) \ .setinputcols(

["pclass"

,"age"

,"sibsp"

,"parch"

,"fare"

]) \

.setoutputcol(

"features"

)from sparkxgb import xgboostclassifier

xgboost = xgboostclassifier(

featurescol=

"features"

, labelcol=

"survived"

, predictioncol=

"prediction"

, missing=

0.0)

pipeline = pipeline(stages=

[vectorassembler, xgboost]

)# randomsplit 隨機分為測試集合訓練集

traindf, testdf = df.randomsplit(

[0.8

,0.2

], seed=24)

traindf.show(2)

print

("************************開始訓練****************************"

)# 擬合模型

model = pipeline.fit(traindf)

print

("************************訓練結束****************************"

)print

("************************開始******************************"

)model.transform(testdf)

.select(

"passengerid"

,"survived"

,"prediction"

).show(

)print

("**************************結束*****************************"

)# 輸出的所有結果

model.transform(testdf)

.show(

)print

("程式結束"

1.上面兩個jar包必須放到spark-submit提交引數裡面。os.environ['pyspark_submit_args'] = '--jars /data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar,/data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar pyspark-shell2.將sparkxgb.zip 解壓到python3 的包的安裝目錄裡面,linux裡面預設安裝路徑如下/usr/local/python3/lib/python3.6/site-packages

3.如果不想將sparkxgb.zip解壓到python包的安裝目錄,不想把jar包放到python**裡面可以。那麼就可以使用spark shell首先要注釋:

os.environ['pyspark_submit_args'] = '--jars /data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar,/data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar pyspark-shell

然後在linux裡面執行如下spark shell命令:

spark-submit --master yarn --py-files /

data

/pycharm/zhanglong/pysparkxgboostnew/sparkxgb.zip --jars /

data

/pycharm/zhanglon/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar,

/data

/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar /

data

/pycharm/zhanglong/pysparkxgboostnew/test.py

zip包和jar包需要指定到具體的位置。

4.spark 預設讀取的csv檔案在hdfs的 /user/root/ 目錄下,執行前需要提前將train.csv檔案上傳到該目錄下面。

如需sparkxgb.zip包和兩個jar包和訓練集可以q:2316352792

分布式系統 Spark

特點 粗粒度的變換。如 map,filter,join 行為 需要得出結果時呼叫 5部分操作意義 資料分割槽集 partitions partition是資料集的最小單位,即乙個shard 位置preferredlocations 輸入partition,輸出是該資料所在的位置 此分割槽在哪台機器上...

Jenkins 分布式執行

master sl e jenkins部署到linux伺服器,執行在windows本地 1 sl e向master報道 jenkins配置 節點管理 配置節點 通過launch,安裝jar包連線主機 2.正常配置jenkins任務 區別點 在general中設定 restrict配置,label是在...

Spark 偽分布式安裝教程

mr跑迭代演算法的侷限性太大,後續想將一部分任務轉移到spark上。公司其他組每天有提交spark任務在yarn上執行。但是他們的客戶機,我們組沒有許可權登入,而且他們也沒有相應的測試機器。於是一咬牙,一跺腳,算了,自己搭環境吧。找了臺我們自己的測試機開幹。給大家上個spark版本資訊的圖 基本每隔...