张量流多处理;未知错误: 无法启动 gRPC 服务器



我正在研究在大型数据集上计算黑森矩阵。我正在尝试在多个CPU上并行执行这些计算。我的设置当前有 1 个节点和 10 个 CPU。我正在研究Python 2.7

我写了一个代码的小抽象,以更好地理解分布式张量流。 下面是错误

2017-07-23 16:16:17.281414: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2225
Process Process-3:
Traceback (most recent call last):
File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 32, in cifar10
serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in __init__
self._server_def.SerializeToString(), status)
File "/home/skay/anaconda2/lib/python2.7/contextlib.py", line 24, in __exit__
self.gen.next()
File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
pywrap_tensorflow.TF_GetCode(status)) UnknownError: Could not start gRPC server

每次运行代码时,我都会收到此错误。但是,它会进一步为我设置的两个过程之一生成输出,如下所示

> `2017-07-23 16:27:48.605617: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session fe9fd6a338e2c9a7 with config: 
2017-07-23 16:27:48.607126: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session 3560417f98b00dea with config: 
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
Process-3
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
Process-3
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
Process-3

在此基础上,它继续等待下一个

ERROR:tensorflow:==================================
Object was never used (type <class 'tensorflow.python.framework.ops.Operation'>):
<tf.Operation 'worker_0/init' type=NoOp>
If you want to mark it as used call its "mark_used()" method.
It was originally created here:
['File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 83, in <module>n    proc.start()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 130, in startn    self._popen = Popen(self)', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/forking.py", line 126, in __init__n    code = process_obj._bootstrap()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrapn    self.run()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in runn    self._target(*self._args, **self._kwargs)', 'File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 49, in cifar10n    init_op=tf.initialize_all_variables(),logdir='/tmp/mydir')', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 170, in wrappedn    return _add_should_use_warning(fn(*args, **kwargs))', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 139, in _add_should_use_warningn    wrapped = TFShouldUseWarningWrapper(x)', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 96, in __init__n    stack = [s.strip() for s in traceback.format_stack()]']
==================================
2017-07-23 16:28:28.646871: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:38.647276: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:48.647526: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica: 

我这里有 2 个问题

  1. 如何修复有关 Grpc 的此错误
  2. 我已经使用 Manager() 设置了一个多处理队列"结果",并在设置进程时将其传递给两个工人。我希望一旦达到条件,每个进程都会将其作业ID写入队列,但是队列似乎始终包含最后一个完成的进程。这是否意味着队列的某个地方被另一个进程覆盖

[{'worker': 0}, {'worker': 0}]

我可以使用多处理队列在张量流上的两个不同进程上运行的两个会话之间共享字典吗?

下面是我的代码

# build a python mutliprocess.py
import multiprocessing
import time
import tensorflow as tf
from tensorflow.contrib.training import HParams
import os
import psutil
import numpy as np
from tensorflow.python.client import device_lib
from resnet import *
import Queue
cluster_spec ={"ps": ["localhost:2226"
],
"worker": [
"localhost:2227",
"localhost:2228"]}
cluster = tf.train.ClusterSpec(cluster_spec)
im_Test = np.linspace(1,10,10)
def model_fun(input):
print multiprocessing.current_process().name
return input
def cifar10(device,return_dict,result_t):
params = HParams(cluster=cluster,
job_name = device[0],
task_index = device[1])
serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
input_img=[]
true_lab=[]
if params.job_name == "ps":
##try and wait for all the wokers t
serv.join()
elif params.job_name == "worker":
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/replica:0/task:%d" % params.task_index,
cluster=cluster)):
# with tf.Graph().as_default(), tf.device('/cpu:%d' % params.task_index):
# with tf.container('%s %d' % ('batchname', params.task_index)) as scope:
input_img = tf.placeholder(dtype=tf.float32, shape=[10,])
with tf.name_scope('%s_%d' % (params.job_name, params.task_index)) as scope:
hess_op = model_fun(input_img)
global_step = tf.contrib.framework.get_or_create_global_step()
sv = tf.train.Supervisor(is_chief=(params.task_index == 0),
global_step=global_step,
init_op=tf.initialize_all_variables(),logdir='/tmp/mydir')
with sv.prepare_or_wait_for_session(serv.target) as sess:
step = 0
while not sv.should_stop() :
hess = sess.run(hess_op, feed_dict={input_img:im_Test })
print(np.array(hess))
print multiprocessing.current_process().name
step += 1
if(step==3):
return_dict[params.job_name] = params.task_index
result_t.put(return_dict)
break
sv.stop()
sess.close()

return
if __name__ == '__main__':
logger = multiprocessing.log_to_stderr()
manager = multiprocessing.Manager()
result = manager.Queue()
return_dict = manager.dict()
processes = []
devices = [['ps', 0],
['worker', 0],
['worker', 1]
]
for i in (devices):
start_time = time.time()
proc = multiprocessing.Process(target=cifar10,args=(i,return_dict,result))
processes.append(proc)
proc.start()
for p in processes:
p.join()
# print return_dict.values()
kill = []
while True:
if result.empty() == True:
break
kill.append(result.get())
print kill

print("time taken = %d" % (start_time - time.time()))

就我而言,我发现ps在提交张量流火花作业纱线集群模式时会引发此错误并等待响应。

ps错误如下

2018-01-17 11:08:46,366 信息 (主线程-7305) 在后台进程的集群节点上启动张量流 ps:02018-01-17 11:08:56,085 信息 (主线程-7395) 0: ======== ps:0 ======== 2018-01-17 11:08:56,086 信息 (主线程-7395) 0:群集规范:{'ps': ['172.16.5.30:33088'], 'worker': ['172.16.5.22:41428', '172.16.5.30:33595']} 2018-01-17 11:08:56,086 信息 (主线程-7395) 0:使用 CPU 2018-01-17 11:08:56.087452:I tensorflow/core/platform/cpu_feature_guard.cc:137] 您的 CPU 支持此 TensorFlow 二进制文件未编译为使用的指令: SSE4.1 SSE4.2 AVX AVX2 FMA E0117 11:08:56.088501182 7395 ev_epoll1_linux.c:1051] GRPC 电子投票 FD:10E0117 11:08:56.088860707 7395 server_chttp2.c:38] {"created":"@1516158536.088783549","description":"在已解决的总数中未添加地址","file":"external/grpc/src/core/ext/transport/chttp2/server/chttp2_server.c","file_line":245,"referenced_errors":[{"created":"@1516158536.088779164","description":"无法添加任何通配符侦听器","file":"external/grpc/src/core/lib/iomgr/tcp_server_posix.c","file_line":338,"referenced_errors":[{"created":"@1516158536.088771177","description":"无法配置套接字","fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1516158536.088767669","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"地址已在使用中","syscall":"bind"}]},{"created":"@1516158536.088778651","description":"无法配置套接字","fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1516158536.088776541","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"地址已在使用中","syscall":"bind"}]}]}]}}流程流程-2: 回溯(最近一次调用): 文件"/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/multiprocessing/process.py",第 258 行,_bootstrap self.run() 文件"/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/multiprocessing/process.py",第 114 行,正在运行 self._target(*self._args, **self._kwargs) 文件 "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000001/tfspark.zip/tensorflowonspark/TFSparkNode.py",第 269 行,wrapper_fn 文件 "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/pyfiles/mnist_dist.py",第 38 行,map_fun cluster, server = ctx.start_cluster_server(1, args.rdma)文件"/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/tfspark.zip/tensorflowonspark/TFSparkNode.py",第 56 行,start_cluster_server 返回TFNode.start_cluster_server(自我、num_gpus、RDMA) 文件 "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/tfspark.zip/tensorflowonspark/TFNode.py",第 110 行,start_cluster_server server = tf.train.Server(cluster, ctx.job_name, ctx.task_index) 文件 "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py",第 145 行,在 initself._server_def 中。SerializeToString(), status)文件"/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py",第 473 行,在出口c_api。TF_GetCode(self.status.status)) 未知错误:无法启动 gRPC 服务器

woker:1 log

2018-01-17 11:09:14.614244:I tensorflow/core/distributed_runtime/master.cc:221] CreateSession Still 等待工作人员的响应:/作业:PS/副本:0/任务:0

然后我检查ps服务器中的端口。是的,端口正在使用中。

因此,重新提交作业可以解决问题。

但是,如果您每次运行代码时都收到此错误,则应检查更多日志以查找原因。

相关内容

  • 没有找到相关文章

最新更新