2016-12-23 52 views
8

Tensorflow'u dağıtmak için yeni biriyim ve CPU'larda senkronize eğitim yapmak için iyi bir örnek arıyorum.Dağıtılmış Tensorflow: CPU'lar üzerinde senkronize eğitim için iyi bir örnek

Zaten Distributed Tensorflow Example'u denedim ve eşzamansız eğitimi 1 parametre sunucusu (1 CPU'lu 1 makine) ve 3 işçi (her işçi = 1 CPU'lu 1 makine) üzerinden başarıyla gerçekleştirebilir. Bununla birlikte, senkronize eğitim söz konusu olduğunda, SyncReplicasOptimizer(V1.0 and V2.0)'un eğitimini takip etmeme rağmen, doğru şekilde çalıştıramam.

Resmi senkronize olmayan eğitim örneğine resmi SyncReplicasOptimizer kodunu ekledim, ancak eğitim işlemi hala senkronize değil. Detaylı kodum aşağıdaki gibidir. Senkron eğitim ile ilgili herhangi bir kod ****** bloğu içinde yer almaktadır.

import tensorflow as tf 
import sys 
import time 

# cluster specification ---------------------------------------------------------------------- 
parameter_servers = ["xx1.edu:2222"] 
workers = ["xx2.edu:2222", "xx3.edu:2222", "xx4.edu:2222"] 
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers}) 

# input flags 
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'") 
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") 
FLAGS = tf.app.flags.FLAGS 

# start a server for a specific task 
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) 

# Parameters ---------------------------------------------------------------------- 
N = 3 # number of replicas 
learning_rate = 0.001 
training_epochs = int(21/N) 
batch_size = 100 

# Network Parameters 
n_input = 784 # MNIST data input (img shape: 28*28) 
n_hidden_1 = 256 # 1st layer number of features 
n_hidden_2 = 256 # 2nd layer number of features 
n_classes = 10 # MNIST total classes (0-9 digits) 

if FLAGS.job_name == "ps": 
    server.join() 
    print("--- Parameter Server Ready ---") 
elif FLAGS.job_name == "worker": 
    # Import MNIST data 
    from tensorflow.examples.tutorials.mnist import input_data 
    mnist = input_data.read_data_sets("/tmp/data/", one_hot=True) 
    # Between-graph replication 
    with tf.device(tf.train.replica_device_setter(
     worker_device="/job:worker/task:%d" % FLAGS.task_index, 
     cluster=cluster)): 
     # count the number of updates 
     global_step = tf.get_variable('global_step', [], 
             initializer = tf.constant_initializer(0), 
             trainable = False, 
             dtype = tf.int32) 
     # tf Graph input 
     x = tf.placeholder("float", [None, n_input]) 
     y = tf.placeholder("float", [None, n_classes]) 

     # Create model 
     def multilayer_perceptron(x, weights, biases): 
      # Hidden layer with RELU activation 
      layer_1 = tf.add(tf.matmul(x, weights['h1']), biases['b1']) 
      layer_1 = tf.nn.relu(layer_1) 
      # Hidden layer with RELU activation 
      layer_2 = tf.add(tf.matmul(layer_1, weights['h2']), biases['b2']) 
      layer_2 = tf.nn.relu(layer_2) 
      # Output layer with linear activation 
      out_layer = tf.matmul(layer_2, weights['out']) + biases['out'] 
      return out_layer 

     # Store layers weight & bias 
     weights = { 
      'h1': tf.Variable(tf.random_normal([n_input, n_hidden_1])), 
      'h2': tf.Variable(tf.random_normal([n_hidden_1, n_hidden_2])), 
      'out': tf.Variable(tf.random_normal([n_hidden_2, n_classes])) 
     } 
     biases = { 
      'b1': tf.Variable(tf.random_normal([n_hidden_1])), 
      'b2': tf.Variable(tf.random_normal([n_hidden_2])), 
      'out': tf.Variable(tf.random_normal([n_classes])) 
     } 

     # Construct model 
     pred = multilayer_perceptron(x, weights, biases) 

     # Define loss and optimizer 
     cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y)) 

     # ************************* SyncReplicasOpt Version 1.0 ***************************************************** 
     ''' This optimizer collects gradients from all replicas, "summing" them, 
     then applying them to the variables in one shot, after which replicas can fetch the new variables and continue. ''' 
     # Create any optimizer to update the variables, say a simple SGD 
     opt = tf.train.AdamOptimizer(learning_rate=learning_rate) 

     # Wrap the optimizer with sync_replicas_optimizer with N replicas: at each step the optimizer collects N gradients before applying to variables. 
     opt = tf.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=N, 
             replica_id=FLAGS.task_index, total_num_replicas=N) 

     # Now you can call `minimize()` or `compute_gradients()` and `apply_gradients()` normally 
     train = opt.minimize(cost, global_step=global_step) 

     # You can now call get_init_tokens_op() and get_chief_queue_runner(). 
     # Note that get_init_tokens_op() must be called before creating session 
     # because it modifies the graph. 
     init_token_op = opt.get_init_tokens_op() 
     chief_queue_runner = opt.get_chief_queue_runner() 
     # ************************************************************************************** 

     # Test model 
     correct = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1)) 
     accuracy = tf.reduce_mean(tf.cast(correct, "float")) 

     # Initializing the variables 
     init_op = tf.initialize_all_variables() 
     print("---Variables initialized---") 

    # ************************************************************************************** 
    is_chief = (FLAGS.task_index == 0) 
    # Create a "supervisor", which oversees the training process. 
    sv = tf.train.Supervisor(is_chief=is_chief, 
          logdir="/tmp/train_logs", 
          init_op=init_op, 
          global_step=global_step, 
          save_model_secs=600) 
    # ************************************************************************************** 

    with sv.prepare_or_wait_for_session(server.target) as sess: 
     # **************************************************************************************   
     # After the session is created by the Supervisor and before the main while loop: 
     if is_chief: 
      sv.start_queue_runners(sess, [chief_queue_runner]) 
      # Insert initial tokens to the queue. 
      sess.run(init_token_op) 
     # ************************************************************************************** 
     # Statistics 
     net_train_t = 0 
     # Training 
     for epoch in range(training_epochs): 
      total_batch = int(mnist.train.num_examples/batch_size) 
      # Loop over all batches 
      for i in range(total_batch): 
       batch_x, batch_y = mnist.train.next_batch(batch_size) 
       # ======== net training time ======== 
       begin_t = time.time() 
       sess.run(train, feed_dict={x: batch_x, y: batch_y}) 
       end_t = time.time() 
       net_train_t += (end_t - begin_t) 
       # =================================== 
      # Calculate training accuracy 
      # acc = sess.run(accuracy, feed_dict={x: mnist.train.images, y: mnist.train.labels}) 
      # print("Epoch:", '%04d' % (epoch+1), " Train Accuracy =", acc) 
      print("Epoch:", '%04d' % (epoch+1)) 
     print("Training Finished!") 
     print("Net Training Time: ", net_train_t, "second") 
     # Testing 
     print("Testing Accuracy = ", accuracy.eval({x: mnist.test.images, y: mnist.test.labels})) 

    sv.stop() 
    print("done") 

Kodumda sorun mu var? Ya da takip etmek için iyi bir örnek alabilir miyim?

+0

kod yüzeysel doğru görünüyorsa, bunu eşzamanlı çalıştırmak için minimize yönteminde bir aggregation_method belirtmek gerekir, ancak 'tf.train.SyncReplicasOptimizer' arayüzü oldukça karmaşıktır, Yani hala bir böcek olabilir. "Eğitim süreci hala asenkron" derken, bunu nasıl gözlemlediniz? – mrry

+0

Cevabınız için teşekkür ederiz, @mry. İdeal bir sınamada, "Epoch #i" nin her bir işçiye aynı zamanda basıldığını görmeyi bekleriz, fakat gözlemlediğim şey şu: İşçi 0'daki "Epoch 1" - (3 dk sonra) -> " Epoch 1 "işçi 1 'e - (3 dakika sonra) -> işçi 2'ye" Epoch 1 "- (3 dakika sonra) -> işçi 0'daki" Epoch 2 "- (3 dk sonra) -> İşçi 1'deki "Epoch 2" - (3 dk sonra) -> İşçi 2'deki "Epoch 2" - (3 dk sonra) -> işçi 0 .... üzerindeki "Epoch 3" sonu. Yani tam olarak tensorflow sin-eğitiminde neler oluyor? Neden düzenli bir çağ dönemi eğitimi var? –

+0

Ayrıca bunu merak ediyorum. Bazen bir CPU'nun arkasına geçip geçemeyeceğini merak ediyorum ve bir CPU'dan iki grup topluyor ve diğer CPU'lardan birinin geride kalmasına izin veriyor. – Aaron

cevap

0

Arka planda MPI kullanan kullanıcı saydam dağıtılmış tensorflow ile ilgilenirseniz emin değilim. Kısa bir süre önce MaTEx: https://github.com/matex-org/matex ile böyle bir versiyon geliştirdik.

Bu nedenle, dağıtılan TensorFlow için, tüm değişiklikler kullanıcıdan soyutlandığından, bir SyncReplicaOptimizer kodu yazmanız gerekmeyecektir.

Bu yardımcı olur umarım.

0

Sorunun, tensorflow'un #9596 sayısında yer alan yorumlar olarak yanıtlanabileceğini düşünüyorum. Bu sorun, tf.train.SyncReplicasOptimizer() dosyasının yeni sürümünün hatalarından kaynaklanmaktadır. Bu sorunu önlemek için bu API'nın eski sürümünü kullanabilirsiniz.

Başka bir çözüm, Tensorflow Distributed Benchmarks kaynaklıdır. Kaynak koduna bir göz atın ve tensorflow'daki kuyruktaki işçileri manuel olarak senkronize ettiklerini görebilirsiniz. Deneyler yoluyla, bu karşılaştırma tam olarak beklediğiniz gibi çalışır.

Bu yorumlar ve kaynaklar sorununuzu çözmenize yardımcı olabilir. Teşekkürler!

0

Bir konu

train = opt.minimize(cost, global_step=global_step, aggregation_method=tf.AggregationMethod.ADD_N)