基於ray的分布式機器學習(一)

2022-09-06 11:48:15 字數 3603 閱讀 7166

基本思路:

1、對資料分塊,使用多個worker分別處理乙個資料塊,每個worker暴露兩個介面,分別是損失計算的介面loss和梯度計算的介面grad;

2、同時定義full_loss和full_grad介面對每個worker的loss和grad進行聚合;

3、使用bfgs演算法進行引數優化,

分別使用full_loss和full_grad作為bfgs的損失函式和梯度函式,即可進行網路引數優化;

注意:在此實現中,每個worker內部每次均計算乙個資料塊上的損失和梯度,而非乙個batch

#0、匯入依賴

import

numpy as np

import

osimport

scipy.optimize

import

tensorflow as tf

from tensorflow.examples.tutorials.mnist import

input_data

import

rayimport

ray.experimental.tf_utils

#1、定義模型

class

linearmodel(object):

def__init__

(self, shape):

"""creates a linearmodel object.

"""x =tf.placeholder(tf.float32, [none, shape[0]])

w =tf.variable(tf.zeros(shape))

b = tf.variable(tf.zeros(shape[1]))

self.x =x

self.w =w

self.b =b

y = tf.nn.softmax(tf.matmul(x, w) +b)

y_ = tf.placeholder(tf.float32, [none, shape[1]])

self.y_ =y_

cross_entropy =tf.reduce_mean(

-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))

self.cross_entropy =cross_entropy

self.cross_entropy_grads =tf.gradients(cross_entropy, [w, b])

self.sess =tf.session()

self.variables =ray.experimental.tf_utils.tensorflowvariables(

cross_entropy, self.sess)

defloss(self, xs, ys):

"""計算loss

"""return

float(

self.sess.run(

self.cross_entropy, feed_dict=))

defgrad(self, xs, ys):

"""計算梯度

"""return

self.sess.run(

self.cross_entropy_grads, feed_dict=)

#2、定義遠端worker,用於計算模型loss、grads

@ray.remote

class

netactor(object):

def__init__

(self, xs, ys):

os.environ[

"cuda_visible_devices

"] = ""

with tf.device(

"/cpu:0"):

self.net = linearmodel([784, 10])

self.xs =xs

self.ys =ys

#計算乙個資料塊的loss

defloss(self, theta):

net =self.net

net.variables.set_flat(theta)

return

net.loss(self.xs, self.ys)

#計算乙個資料塊的梯度

defgrad(self, theta):

net =self.net

net.variables.set_flat(theta)

gradients =net.grad(self.xs, self.ys)

return np.concatenate([g.flatten() for g in

gradients])

defget_flat_size(self):

return

self.net.variables.get_flat_size()

#3、獲取遠端worker損失的函式

deffull_loss(theta):

theta_id =ray.put(theta)

loss_ids = [actor.loss.remote(theta_id) for actor in

actors]

return

sum(ray.get(loss_ids))

#4、獲取遠端worker梯度的函式

deffull_grad(theta):

theta_id =ray.put(theta)

grad_ids = [actor.grad.remote(theta_id) for actor in

actors]

#使用fmin_l_bfgs_b須轉換為float64資料型別

return sum(ray.get(grad_ids)).astype("

float64")

#5、使用lbfgs進行訓練

if__name__ == "

__main__":

ray.init()

mnist = input_data.read_data_sets("

mnist_data

", one_hot=true)  #

資料分塊,每個worker跑乙個資料塊

num_batches = 10batch_size = mnist.train.num_examples //num_batches

batches = [mnist.train.next_batch(batch_size) for _ in

range(num_batches)]

actors = [netactor.remote(xs, ys) for (xs, ys) in

batches]  #

引數初始化

dim =ray.get(actors[0].get_flat_size.remote())

theta_init = 1e-2 * np.random.normal(size=dim)  #

優化 result =scipy.optimize.fmin_l_bfgs_b(

full_loss, theta_init, maxiter=10, fprime=full_grad, disp=true)

分布式機器學習第3章 分布式機器學習框架

q 需要使用到分布式機器學習有哪三種情形?q 對於計算量太大時的分布式機器學習解決辦法 q 對於訓練資料太多時的分布式機器學習解決辦法 q 對於模型規模太大時的分布式機器學習解決辦法 q 目前分布式機器學習領域的主要矛盾是?q 分布式機器學習的主要組成模組有哪四個?q 分布式機器學習的資料劃分中,對...

分布式機器學習dask

分布式機器學習 dask是乙個資料分析的平行計算的框架。pip安裝 pip install dask compete install everything pip install dask install only core cluster 部署 安裝dask 1.2.2 conda install...

分布式機器學習主要筆記

mahout是hadoop的乙個機器學習庫,主要的程式設計模型是mapreduce 每個企業的資料都是多樣的和特別針對他們需求的。然而,在對那些資料的分析種類上卻沒多少多樣性。mahout專案是實施普通分析計算的乙個hadoop庫。用例包括使用者協同過濾 使用者建議 聚類和分類。mllib 執行在s...