Tensorflow'u dağıtmak için yeni biriyim ve CPU'larda senkronize eğitim yapmak için iyi bir örnek arıyorum.Dağıtılmış Tensorflow: CPU'lar üzerinde senkronize eğitim için iyi bir örnek
Zaten Distributed Tensorflow Example'u denedim ve eşzamansız eğitimi 1 parametre sunucusu (1 CPU'lu 1 makine) ve 3 işçi (her işçi = 1 CPU'lu 1 makine) üzerinden başarıyla gerçekleştirebilir. Bununla birlikte, senkronize eğitim söz konusu olduğunda, SyncReplicasOptimizer(V1.0 and V2.0)'un eğitimini takip etmeme rağmen, doğru şekilde çalıştıramam.
Resmi senkronize olmayan eğitim örneğine resmi SyncReplicasOptimizer kodunu ekledim, ancak eğitim işlemi hala senkronize değil. Detaylı kodum aşağıdaki gibidir. Senkron eğitim ile ilgili herhangi bir kod ****** bloğu içinde yer almaktadır.
import tensorflow as tf
import sys
import time
# cluster specification ----------------------------------------------------------------------
parameter_servers = ["xx1.edu:2222"]
workers = ["xx2.edu:2222", "xx3.edu:2222", "xx4.edu:2222"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})
# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS
# start a server for a specific task
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
# Parameters ----------------------------------------------------------------------
N = 3 # number of replicas
learning_rate = 0.001
training_epochs = int(21/N)
batch_size = 100
# Network Parameters
n_input = 784 # MNIST data input (img shape: 28*28)
n_hidden_1 = 256 # 1st layer number of features
n_hidden_2 = 256 # 2nd layer number of features
n_classes = 10 # MNIST total classes (0-9 digits)
if FLAGS.job_name == "ps":
server.join()
print("--- Parameter Server Ready ---")
elif FLAGS.job_name == "worker":
# Import MNIST data
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)
# Between-graph replication
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# count the number of updates
global_step = tf.get_variable('global_step', [],
initializer = tf.constant_initializer(0),
trainable = False,
dtype = tf.int32)
# tf Graph input
x = tf.placeholder("float", [None, n_input])
y = tf.placeholder("float", [None, n_classes])
# Create model
def multilayer_perceptron(x, weights, biases):
# Hidden layer with RELU activation
layer_1 = tf.add(tf.matmul(x, weights['h1']), biases['b1'])
layer_1 = tf.nn.relu(layer_1)
# Hidden layer with RELU activation
layer_2 = tf.add(tf.matmul(layer_1, weights['h2']), biases['b2'])
layer_2 = tf.nn.relu(layer_2)
# Output layer with linear activation
out_layer = tf.matmul(layer_2, weights['out']) + biases['out']
return out_layer
# Store layers weight & bias
weights = {
'h1': tf.Variable(tf.random_normal([n_input, n_hidden_1])),
'h2': tf.Variable(tf.random_normal([n_hidden_1, n_hidden_2])),
'out': tf.Variable(tf.random_normal([n_hidden_2, n_classes]))
}
biases = {
'b1': tf.Variable(tf.random_normal([n_hidden_1])),
'b2': tf.Variable(tf.random_normal([n_hidden_2])),
'out': tf.Variable(tf.random_normal([n_classes]))
}
# Construct model
pred = multilayer_perceptron(x, weights, biases)
# Define loss and optimizer
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y))
# ************************* SyncReplicasOpt Version 1.0 *****************************************************
''' This optimizer collects gradients from all replicas, "summing" them,
then applying them to the variables in one shot, after which replicas can fetch the new variables and continue. '''
# Create any optimizer to update the variables, say a simple SGD
opt = tf.train.AdamOptimizer(learning_rate=learning_rate)
# Wrap the optimizer with sync_replicas_optimizer with N replicas: at each step the optimizer collects N gradients before applying to variables.
opt = tf.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=N,
replica_id=FLAGS.task_index, total_num_replicas=N)
# Now you can call `minimize()` or `compute_gradients()` and `apply_gradients()` normally
train = opt.minimize(cost, global_step=global_step)
# You can now call get_init_tokens_op() and get_chief_queue_runner().
# Note that get_init_tokens_op() must be called before creating session
# because it modifies the graph.
init_token_op = opt.get_init_tokens_op()
chief_queue_runner = opt.get_chief_queue_runner()
# **************************************************************************************
# Test model
correct = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1))
accuracy = tf.reduce_mean(tf.cast(correct, "float"))
# Initializing the variables
init_op = tf.initialize_all_variables()
print("---Variables initialized---")
# **************************************************************************************
is_chief = (FLAGS.task_index == 0)
# Create a "supervisor", which oversees the training process.
sv = tf.train.Supervisor(is_chief=is_chief,
logdir="/tmp/train_logs",
init_op=init_op,
global_step=global_step,
save_model_secs=600)
# **************************************************************************************
with sv.prepare_or_wait_for_session(server.target) as sess:
# **************************************************************************************
# After the session is created by the Supervisor and before the main while loop:
if is_chief:
sv.start_queue_runners(sess, [chief_queue_runner])
# Insert initial tokens to the queue.
sess.run(init_token_op)
# **************************************************************************************
# Statistics
net_train_t = 0
# Training
for epoch in range(training_epochs):
total_batch = int(mnist.train.num_examples/batch_size)
# Loop over all batches
for i in range(total_batch):
batch_x, batch_y = mnist.train.next_batch(batch_size)
# ======== net training time ========
begin_t = time.time()
sess.run(train, feed_dict={x: batch_x, y: batch_y})
end_t = time.time()
net_train_t += (end_t - begin_t)
# ===================================
# Calculate training accuracy
# acc = sess.run(accuracy, feed_dict={x: mnist.train.images, y: mnist.train.labels})
# print("Epoch:", '%04d' % (epoch+1), " Train Accuracy =", acc)
print("Epoch:", '%04d' % (epoch+1))
print("Training Finished!")
print("Net Training Time: ", net_train_t, "second")
# Testing
print("Testing Accuracy = ", accuracy.eval({x: mnist.test.images, y: mnist.test.labels}))
sv.stop()
print("done")
Kodumda sorun mu var? Ya da takip etmek için iyi bir örnek alabilir miyim?
kod yüzeysel doğru görünüyorsa, bunu eşzamanlı çalıştırmak için minimize yönteminde bir aggregation_method belirtmek gerekir, ancak 'tf.train.SyncReplicasOptimizer' arayüzü oldukça karmaşıktır, Yani hala bir böcek olabilir. "Eğitim süreci hala asenkron" derken, bunu nasıl gözlemlediniz? – mrry
Cevabınız için teşekkür ederiz, @mry. İdeal bir sınamada, "Epoch #i" nin her bir işçiye aynı zamanda basıldığını görmeyi bekleriz, fakat gözlemlediğim şey şu: İşçi 0'daki "Epoch 1" - (3 dk sonra) -> " Epoch 1 "işçi 1 'e - (3 dakika sonra) -> işçi 2'ye" Epoch 1 "- (3 dakika sonra) -> işçi 0'daki" Epoch 2 "- (3 dk sonra) -> İşçi 1'deki "Epoch 2" - (3 dk sonra) -> İşçi 2'deki "Epoch 2" - (3 dk sonra) -> işçi 0 .... üzerindeki "Epoch 3" sonu. Yani tam olarak tensorflow sin-eğitiminde neler oluyor? Neden düzenli bir çağ dönemi eğitimi var? –
Ayrıca bunu merak ediyorum. Bazen bir CPU'nun arkasına geçip geçemeyeceğini merak ediyorum ve bir CPU'dan iki grup topluyor ve diğer CPU'lardan birinin geride kalmasına izin veriyor. – Aaron