TensorFlow分布式實踐

2022-03-17 16:31:49 字數 4383 閱讀 3101

大資料時代,基於單機的建模很難滿足企業不斷增長的資料量級的需求,開發者需要使用分布式的開發方式,在集群上進行建模。而單機和分布式的開發**有一定的區別,本文就將為開發者們介紹,基於tensorflow進行分布式開發的兩種方式,幫助開發者在實踐的過程中,更好地選擇模組的開發方向。

基於tensorflow原生的分布式開發

分布式開發會涉及到更新梯度的方式,有同步和非同步的兩個方案,同步更新的方式在模型的表現上能更快地進行收斂,而非同步更新時,迭代的速度則會更加快。兩種更新方式的圖示如下:

同步更新流程

非同步更新流程

tensorflow是基於ps、work 兩種伺服器進行分布式的開發。ps伺服器可以只用於引數的彙總更新,讓各個work進行梯度的計算。

基於tensorflow原生的分布式開發的具體流程如下:

首先指定ps 伺服器啟動引數 –job_name=ps:

python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=ps --task_index=0
接著指定work伺服器引數(啟動兩個work 節點) –job_name=work2:

python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=0

python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=1

之後,上述指定的引數 worker_hosts ps_hosts job_name task_index 都需要在py檔案中接受使用:

worker_hosts

", "

預設值", "

描述說明

")接收引數後,需要分別註冊ps、work,使他們各司其職:

ps_hosts = flags.ps_hosts.split(","

)worker_hosts = flags.worker_hosts.split(","

)cluster = tf.train.clusterspec()

server = tf.train.server(cluster,job_name=flags.job_name,task_index=flags.task_index)

issync =flags.issync

if flags.job_name == "ps"

: server.join()

elif flags.job_name == "

worker":

with tf.device(tf.train.replica_device_setter(

worker_device="

/job:worker/task:%d

" %flags.task_index,

cluster=cluster)):

繼而更新梯度。

(1)同步更新梯度:

rep_op =tf.train.syncreplicasoptimizer(optimizer,

replicas_to_aggregate=len(worker_hosts),

replica_id=flags.task_index,

total_num_replicas=len(worker_hosts),

use_locking=true)

init_token_op =rep_op.get_init_tokens_op()

chief_queue_runner = rep_op.get_chief_queue_runner()

(2)非同步更新梯度:

最後,使用tf.train.supervisor 進行真的迭代

另外,開發者還要注意,如果是同步更新梯度,則還需要加入如下**:

sv.start_queue_runners(sess, [chief_queue_runner])

sess.run(init_token_op)

需要注意的是,上述非同步的方式需要自行指定集群ip和埠,不過,開發者們也可以借助tensorflowonspark,使用yarn進行管理。

基於tensorflowonspark的分布式開發

基於tensorflowonspark的分布式開發的具體流程如下:

首先,需要使用spark-submit來提交任務,同時指定spark需要執行的引數(–num-executors 6等)、模型**、模型超參等,同樣需要接受外部引數:

parser =argparse.argumentparser()

parser.add_argument("-i

", "

--tracks

", help="

資料集路徑

")

args = parser.parse_args()

之後,準備好引數和訓練資料(dataframe),呼叫模型的api進行啟動。

其中,soft_dist.map_fun是要調起的方法,後面均是模型訓練的引數。

estimator =tfestimator(soft_dist.map_fun, args) \

'tracks

': '

tracks

', '

label

': '

label

'}) \

.setmodeldir(args.model) \

.setexportdir(args.serving) \

.setclustersize(args.cluster_size) \

.setnumps(num_ps) \

.setepochs(args.epochs) \

.setbatchsize(args.batch_size) \

.setsteps(args.max_steps)

model = estimator.fit(df)

接下來是soft_dist定義乙個 map_fun(args, ctx)的方法:

def

map_fun(args, ctx):

...worker_num = ctx.worker_num #

worker數量

job_name = ctx.job_name #

job名

task_index = ctx.task_index #

任務索引

if job_name == "

ps": #

ps節點(主節點)

time.sleep((worker_num + 1) * 5)

cluster, server = tfnode.start_cluster_server(ctx, 1, args.rdma)

num_workers = len(cluster.as_dict()['

worker'])

if job_name == "ps"

: server.join()

elif job_name == "

worker":

with tf.device(tf.train.replica_device_setter(worker_device="

/job:worker/task:%d

" % task_index, cluster=cluster)):

之後,可以使用tf.train.monitoredtrainingsession高階api,進行模型訓練和**。

總結

基於tensorflow的分布式開發大致就是本文中介紹的兩種情況,第二種方式可以用於實際的生產環境,穩定性會更高。

在執行結束的時候,開發者們也可通過設定郵件的通知,及時地了解到模型執行的情況。

同時,如果開發者使用sessionrunhook來儲存最後輸出的模型,也需要了解到,框架**中的乙個bug,即它只能在規定的時間內儲存,超出規定時間,即使執行沒有結束,程式也會被強制結束。如果開發者使用的版本是未修復bug的版本,則要自行處理,放寬執行時間。

TensorFlow分布式實踐

大資料時代,基於單機的建模很難滿足企業不斷增長的資料量級的需求,開發者需要使用分布式的開發方式,在集群上進行建模。而單機和分布式的開發 有一定的區別,本文就將為開發者們介紹,基於tensorflow進行分布式開發的兩種方式,幫助開發者在實踐的過程中,更好地選擇模組的開發方向。分布式開發會涉及到更新梯...

TensorFlow分布式計算

分布式tensorflow底層的通訊是grpc。grpc首先是乙個rpc,即遠端過程呼叫,通俗的解釋是 假設你在本機上執行一段 num add a,b 它呼叫了乙個過程call,然後返回了乙個值num,你感覺這段 只是在本機上執行的,但實際情況是,本機上的add方法是將引數打包傳送給伺服器,然後伺服...

saiku 分布式實踐

saiku比較吃記憶體,一旦人多了,那麼記憶體可能不夠,所以會考慮主從結構,分擔壓力。為了保證資料的穩定性,也會有類似的考慮,那麼問題來了,如何實現saiku的分布式搭建哪?首先saiku使用的jackrabbit儲存的元資料結構,而他使用repository資料夾儲存資料,所以分布式必然要共享資料...