2017-02-22 55 views
0

меня есть три узла для запуска распределенного tensorflow, который два работника (один имеет GPU, один нет) и один пс (без GPU) .The код ниже:Запуск распределенных tensorflow пример с ошибкой

from __future__ import print_function 

import tensorflow as tf 
import sys 
import time 

# cluster specification 
parameter_servers = ["192.168.1.102:2222"] 
workers = [ "192.168.1.103:2223", 
     "192.168.1.104:2224"] 
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers}) 

# input flags 
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'") 
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") 
FLAGS = tf.app.flags.FLAGS 

# start a server for a specific task 
server = tf.train.Server(cluster, 
         job_name=FLAGS.job_name, 
         task_index=FLAGS.task_index) 

# config 
batch_size = 100 
learning_rate = 0.001 
training_epochs = 20 
logs_path = "/tmp/mnist/1" 

# load mnist data set 
from tensorflow.examples.tutorials.mnist import input_data 
mnist = input_data.read_data_sets('MNIST_data', one_hot=True) 

if FLAGS.job_name == "ps": 
    server.join() 
elif FLAGS.job_name == "worker": 

    # Between-graph replication 
    with tf.device(tf.train.replica_device_setter(
    worker_device="/job:worker/task:%d" % FLAGS.task_index, 
    cluster=cluster)): 

    # count the number of updates 
    global_step = tf.get_variable('global_step', [], 
          initializer = tf.constant_initializer(0), 
          trainable = False) 

    # input images 
    with tf.name_scope('input'): 
     # None -> batch size can be any size, 784 -> flattened mnist image 
     x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input") 
     # target 10 output classes 
     y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input") 

    # model parameters will change during training so we use tf.Variable 
    tf.set_random_seed(1) 
    with tf.name_scope("weights"): 
     W1 = tf.Variable(tf.random_normal([784, 100])) 
     W2 = tf.Variable(tf.random_normal([100, 10])) 

    # bias 
    with tf.name_scope("biases"): 
     b1 = tf.Variable(tf.zeros([100])) 
     b2 = tf.Variable(tf.zeros([10])) 

    # implement model 
    with tf.name_scope("softmax"): 
     # y is our prediction 
     z2 = tf.add(tf.matmul(x,W1),b1) 
     a2 = tf.nn.sigmoid(z2) 
     z3 = tf.add(tf.matmul(a2,W2),b2) 
     y = tf.nn.softmax(z3) 

    # specify cost function 
    with tf.name_scope('cross_entropy'): 
     # this is our cost 
     cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1])) 

    # specify optimizer 
    with tf.name_scope('train'): 
     # optimizer is an "operation" which we can execute in a session 
     grad_op = tf.train.GradientDescentOptimizer(learning_rate) 
     train_op = grad_op.minimize(cross_entropy, global_step=global_step) 

    with tf.name_scope('Accuracy'): 
     # accuracy 
     correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1)) 
     accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) 

    # create a summary for our cost and accuracy 
    tf.scalar_summary("cost", cross_entropy) 
    tf.scalar_summary("accuracy", accuracy) 

    # merge all summaries into a single "operation" which we can execute in a session 
    summary_op = tf.merge_all_summaries() 
    init_op = tf.initialize_all_variables() 
    print("Variables initialized ...") 

    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), 
         global_step=global_step, 
         init_op=init_op) 

    begin_time = time.time() 
    frequency = 100 
    with sv.prepare_or_wait_for_session(server.target) as sess: 
    # create log writer object (this will log on every machine) 
    writer = tf.train.SummaryWriter(logs_path, graph=tf.get_default_graph()) 

    # perform training cycles 
    start_time = time.time() 
    for epoch in range(training_epochs): 

     # number of batches in one epoch 
     batch_count = int(mnist.train.num_examples/batch_size) 

     count = 0 
     for i in range(batch_count): 
     batch_x, batch_y = mnist.train.next_batch(batch_size) 

     # perform the operations we defined earlier on batch 
     _, cost, summary, step = sess.run(
        [train_op, cross_entropy, summary_op, global_step], 
        feed_dict={x: batch_x, y_: batch_y}) 
     writer.add_summary(summary, step) 

     count += 1 
     if count % frequency == 0 or i+1 == batch_count: 
      elapsed_time = time.time() - start_time 
      start_time = time.time() 
      print("Step: %d," % (step+1), 
       " Epoch: %2d," % (epoch+1), 
       " Batch: %3d of %3d," % (i+1, batch_count), 
       " Cost: %.4f," % cost, 
       " AvgTime: %3.2fms" % float(elapsed_time*1000/frequency)) 
      count = 0 

    print("Test-Accuracy: %2.2f" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels})) 
    print("Total Time: %3.2fs" % float(time.time() - begin_time)) 
    print("Final Cost: %.4f" % cost) 

    sv.stop() 
    print("done") 

я бегу выше код на моем три узла с инструкцией ниже в терминале:

pc-01$ python example.py --job-name="ps" --task_index=0 
pc-02$ python example.py --job-name="worker" --task_index=0 
pc-03$ python example.py --job-name="worker" --task_index=1 

Однако, после того, как переменные инициализируются, я встретил вопрос о том, что терминал работника всегда печати:

I tensor flow/core/distributed_runtime/master.cc:193] CreateSession still waiting for response from worker:/job:worker/replica:0/task:0 

и терминал ps не продолжаются. IP-адрес ps равен 192.168.1.102, а IP-адрес работника - 192.168.1.103,192.168.1.104, как и код выше. Кто-нибудь может мне помочь?

+0

Не зная IP-адреса ваших трех узлов, я не уверен в вашей проблеме. Я предлагаю, чтобы все задания вашего примера работали на одном узле и меняли хосты в вашем ClusterSpec на «localhost» для отладки. –

+0

IP ps - 192.168.1.102, а IP рабочего - 192.168.1.103,192.168.1.104. –

+0

Когда я запускаю свой пример на одном узле, он может получить точность, но получить результат, а затем другой принять. Но при работе на трех компьютерах все еще не работает. –

ответ

0

Я думаю, что отфильтрованное устройство должно помочь здесь. Не могли бы вы попробовать добавить device_filter в свою сессию?

 config = tf.ConfigProto(
     allow_soft_placement=True, 
     log_device_placement=False, 
     device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index]) 

with sv.prepare_or_wait_for_session(server.target, config=\config) as sess: 

Это должно решить проблему.