Tensorflow distributed : CreateSession 仍在等待来自 worker 的响应:/jo



我正在尝试使用 TF 运行我的第一个分布式训练示例。我使用了 TF 文档中的示例 https://www.tensorflow.org/deploy/distributed 在不同的集群上各有一个 ps 和一个工作线程。但是,我总是在工作线程集群上获得CreateSession still waiting for response from worker: /job:ps/replica:0/task:0,如下所示!

trainer.py

import argparse
import sys
import tensorflow as tf
FLAGS = None

def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# input images
with tf.name_scope('input'):
# None -> batch size can be any size, 784 -> flattened mnist image
x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
# target 10 output classes
y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
# model parameters will change during training so we use tf.Variable
tf.set_random_seed(1)
with tf.name_scope("weights"):
W1 = tf.Variable(tf.random_normal([784, 100]))
W2 = tf.Variable(tf.random_normal([100, 10]))
# bias
with tf.name_scope("biases"):
b1 = tf.Variable(tf.zeros([100]))
b2 = tf.Variable(tf.zeros([10]))
# implement model
with tf.name_scope("softmax"):
# y is our prediction
z2 = tf.add(tf.matmul(x,W1),b1)
a2 = tf.nn.sigmoid(z2)
z3 = tf.add(tf.matmul(a2,W2),b2)
y  = tf.nn.softmax(z3)
# specify cost function
with tf.name_scope('cross_entropy'):
# this is our cost
loss = tf.reduce_mean(
-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
global_step = tf.contrib.framework.get_or_create_global_step()
train_op = tf.train.AdagradOptimizer(0.01).minimize(
loss, global_step=global_step)
# The StopAtStepHook handles stopping after running given steps.
hooks=[tf.train.StopAtStepHook(last_step=1000000)]
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(FLAGS.task_index == 0),
checkpoint_dir="/tmp/train_logs",
hooks=hooks) as mon_sess:
while not mon_sess.should_stop():
# Run a training step asynchronously.
# See <a href="../api_docs/python/tf/train/SyncReplicasOptimizer"><code>tf.train.SyncReplicasOptimizer</code></a> for additional details on how to
# perform *synchronous* training.
# mon_sess.run handles AbortedError in case of preempted PS.
mon_sess.run(train_op)

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.register("type", "bool", lambda v: v.lower() == "true")
# Flags for defining the tf.train.ClusterSpec
parser.add_argument(
"--ps_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--worker_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--job_name",
type=str,
default="",
help="One of 'ps', 'worker'"
)
# Flags for defining the tf.train.Server
parser.add_argument(
"--task_index",
type=int,
default=0,
help="Index of task within the job"
)
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

PS 集群

$ python trainer.py --ps_hosts=<ps-ipaddress>:2222 --worker_hosts=<worker-ipaddress>:2222 --job_name=ps --task_index=0
2018-07-06 21:52:34.495508: I tensorflow/core/platform/cpu_feature_guard.cc:140] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2018-07-06 21:52:34.802537: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1356] Found device 0 with properties: 
name: Tesla K80 major: 3 minor: 7 memoryClockRate(GHz): 0.8235
pciBusID: 0000:05:00.0
totalMemory: 11.17GiB freeMemory: 6.98GiB
2018-07-06 21:52:35.129511: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1356] Found device 1 with properties: 
name: Tesla K80 major: 3 minor: 7 memoryClockRate(GHz): 0.8235
pciBusID: 0000:06:00.0
totalMemory: 11.17GiB freeMemory: 6.98GiB
2018-07-06 21:52:35.130066: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1435] Adding visible gpu devices: 0, 1
2018-07-06 21:52:36.251900: I tensorflow/core/common_runtime/gpu/gpu_device.cc:923] Device interconnect StreamExecutor with strength 1 edge matrix:
2018-07-06 21:52:36.252045: I tensorflow/core/common_runtime/gpu/gpu_device.cc:929]      0 1 
2018-07-06 21:52:36.252058: I tensorflow/core/common_runtime/gpu/gpu_device.cc:942] 0:   N Y 
2018-07-06 21:52:36.252067: I tensorflow/core/common_runtime/gpu/gpu_device.cc:942] 1:   Y N 
2018-07-06 21:52:36.252770: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1053] Created TensorFlow device (/job:ps/replica:0/task:0/device:GPU:0 with 6754 MB memory) -> physical GPU (device: 0, name: Tesla K80, pci bus id: 0000:05:00.0, compute capability: 3.7)
2018-07-06 21:52:36.357351: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1053] Created TensorFlow device (/job:ps/replica:0/task:0/device:GPU:1 with 6754 MB memory) -> physical GPU (device: 1, name: Tesla K80, pci bus id: 0000:06:00.0, compute capability: 3.7)
2018-07-06 21:52:36.468733: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2222}
2018-07-06 21:52:36.468788: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job target -> {0 -> <ps-ipaddress>:2222}
2018-07-06 21:52:36.468801: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> <worker-ipaddress>:2222}
2018-07-06 21:52:36.506840: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:332] Started server with target: grpc://localhost:2222

和工人

$ python trainer.py --ps_hosts=<ps-ipaddress>:2222 --worker_hosts=<worker-ipaddress>:2222 --job_name=worker --task_index=0
2018-07-06 21:55:13.276064: I tensorflow/core/platform/cpu_feature_guard.cc:140] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2018-07-06 21:55:17.948796: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1356] Found device 0 with properties: 
name: Tesla K80 major: 3 minor: 7 memoryClockRate(GHz): 0.8235
pciBusID: 0000:05:00.0
totalMemory: 11.17GiB freeMemory: 11.10GiB
2018-07-06 21:55:18.082286: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1356] Found device 1 with properties: 
name: Tesla K80 major: 3 minor: 7 memoryClockRate(GHz): 0.8235
pciBusID: 0000:06:00.0
totalMemory: 11.17GiB freeMemory: 11.10GiB
2018-07-06 21:55:18.082538: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1435] Adding visible gpu devices: 0, 1
2018-07-06 21:55:18.591166: I tensorflow/core/common_runtime/gpu/gpu_device.cc:923] Device interconnect StreamExecutor with strength 1 edge matrix:
2018-07-06 21:55:18.591218: I tensorflow/core/common_runtime/gpu/gpu_device.cc:929]      0 1 
2018-07-06 21:55:18.591227: I tensorflow/core/common_runtime/gpu/gpu_device.cc:942] 0:   N Y 
2018-07-06 21:55:18.591232: I tensorflow/core/common_runtime/gpu/gpu_device.cc:942] 1:   Y N 
2018-07-06 21:55:18.591751: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1053] Created TensorFlow device (/job:worker/replica:0/task:0/device:GPU:0 with 10764 MB memory) -> physical GPU (device: 0, name: Tesla K80, pci bus id: 0000:05:00.0, compute capability: 3.7)
2018-07-06 21:55:18.696213: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1053] Created TensorFlow device (/job:worker/replica:0/task:0/device:GPU:1 with 10764 MB memory) -> physical GPU (device: 1, name: Tesla K80, pci bus id: 0000:06:00.0, compute capability: 3.7)
2018-07-06 21:55:18.801080: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> <ps-ipaddress>:2222}
2018-07-06 21:55:18.801134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2222}
2018-07-06 21:55:18.809115: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:332] Started server with target: grpc://localhost:2222
WARNING:tensorflow:From mnist_distributed.py:62: get_or_create_global_step (from tensorflow.contrib.framework.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Please switch to tf.train.get_or_create_global_step
2018-07-06 21:55:31.532416: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:ps/replica:0/task:0
2018-07-06 21:55:41.532559: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:ps/replica:0/task:0

<ps-ipaddress><worker-ipaddress>都将替换为实际地址。 不确定这是否是问题,但这些地址位于远程群集中。例如,为了连接到这些集群,我必须在那里ssh,所以我正在做额外的登录步骤,这应该是TF的问题吗?

更新: 我发现当我对ps和worker使用同一台机器时,只有当我为网络地址提供路径localhost:PORT_NUM时,它才能正常工作。但是,当我使用用于ssh登录的实际地址时,它不起作用!

好吧,这个问题有点傻!我使用user@example.comssh 到远程服务器,我用来在 Tensorflow 中定义集群 IP 地址,但事实证明我应该只使用example.com之后问题就解决了!

我发现其他人建议的另一件事是任务 ID 应与集群 IP 地址匹配。为简单起见,请尝试使用一个 ps 和一个在同一台计算机上的工作线程,看看它是否适合您。

最新更新