pyspark提交任務依賴模組的解決方案

2021-10-11 02:10:39 字數 4568 閱讀 7346

spark-submit --deploy-mode client  --driver-memory 2g --executor-memory 2g --executor-cores 3 --num-executors 3 --properties-file /etc/spark/conf/spark-defaults.conf test.py
from  helper.util_helper import sub_name

data_converted = data.

map(

lambda x:

(sub_name(x[2]

[1])

, sub_name(x[1]

[1])

, sub_name(x[2]

[1])

))

zip -r helper.zip helper/

spark-submit --deploy-mode client  --driver-memory 2g --executor-memory 2g --executor-cores 3 --num-executors 3 --properties-file /etc/spark/conf/spark-defaults.conf --py-files ./helper.zip test.py
def

get_bd_res()

:import sys

# return sys.path

import requests

url =

""payload=

headers =

return

str(sys.path)

+requests.request(

"get"

, url, headers=headers, data=payload)

.text..

....

#測試依賴打包上傳集群

from helper.util_helper import sub_name

data_converted = data.

map(

lambda x:

(get_bd_res(),

get_bd_res(

), sub_name(x[2]

[1])

))

wget 

bash anaconda3-5.2.0-linux-x86_64.sh

conda create -n python3.6 python==3.6 anaconda
zip -r anaconda3.zip anaconda3/*

hadoop fs -put ./anaconda3.zip /data/ai

spark-submit 

--deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 --num-executors 3

--properties-file /etc/spark/conf/spark-defaults.conf

--py-files ./helper.zip

--archives hdfs:///data/ai/anaconda3.zip#anaconda3 (這個一定不要省略,代表集群拉下來zip包後解壓到的檔名)

--conf spark.pyspark.python=./anaconda3/anaconda3/envs/python3.6/bin/python3 (集群worker使用從hdfs上拉下來的解壓過的python3.6直譯器)

test.py

spark-submit --deploy-mode cluster  

--driver-memory 2g --executor-memory 2g --executor-cores 3 --num-executors 3

--properties-file /etc/spark/conf/spark-defaults.conf

--py-files ./helper.zip --archives hdfs:///data/ai/anaconda3.zip#anaconda3

--queue root.default

--name my.test.py

--conf spark.pyspark.python=./anaconda3/anaconda3/envs/python3.6/bin/python3

test.py

test.py

from pyspark.conf import sparkconf

from pyspark.context import sparkcontext

from pyspark.sql import sparksession

default_yarn_queue =

"root.default"

default_master =

"yarn"

defopen_spark_session

"ai-train"

, executor_memory=

"2g"

, executor_instances=

"3", executor_cores=

"2", driver_memory=

"2g"):

executor_instances=executor_instances, executor_cores=executor_cores,

driver_memory=driver_memory, yarn_queue=default_yarn_queue)

spark = sparksession.builder.config(conf=conf)

.getorcreate(

)return spark

defget_spark_config

(master=

"yarn-client"

"ai-train"

, executor_memory=

"2g"

, executor_instances=

"3", executor_cores=

"2", driver_memory=

"2g"

, yarn_queue=

"root.ai"):

conf =

(sparkconf(

).setmaster(master)

.set

("spark.executor.memory"

, executor_memory)

.set

("spark.executor.instances"

, executor_instances)

.set

("spark.executor.cores"

, executor_cores)

.set

("spark.driver.memory"

, driver_memory)

.set

("spark.yarn.queue"

, yarn_queue)

)return conf

"test"

)data = spark.sparkcontext.parallelize([[

('id'

,'a0w1a0000003xb1a'),

('packsize'

,1.0),

('name'

,'a')]

,[('id'

,'a0w1a0000003xaai'),

('packsize'

,1.0),

('name'

,'b')]

,[('id'

,'a0w1a00000xb3aai'),

('packsize'

,30.0),

('name'

,'c')]

])data_converted = data.

map(

lambda x:

(x[2][

1], x[1]

[1], x[2]

[1])

)print

(data_converted.take(3)

)

helper/util_helper.py

def

sub_name

(name)

:return

"testaaa"

+str

(name)

**配置的sparkconf

spark-submit命令列的引數指定

spark-default等配置檔案的引數

pyspark提交集群任務

建議使用conda conda轉殖環境 conda create n prod env clone base 進入conda的miniconda3 envs 打包python環境 zip r prod env.zip prod env sh指令碼 exportpyspark driver pytho...

在hue(oozie)上提交pyspark

因為需要使用python的一些庫,不得不使用pyspark。在這裡記錄下遇到的問題。本地開發環境,網上比較容易查到,這裡就不寫了。這裡主要說線上提交的依賴問題。這裡是用hue提交,主要有以下幾個步驟 1 將所有的依賴打成zip包 2 使用sc.addpyfiles path 或者是sc.addpyf...

pyspark 新增 redis 模組

安裝 redis 模組 並把 redis 模組打包 pip install redis mkdir redis mv site packages redis redis import shutil dir name redis output filename redis shutil.make ar...