我创建主模型的多个副本,每个副本在不同的过程中,以便分别获取每个副本的梯度并将它们全部应用于主模型。(我现在只用一个子进程(工人(测试它:
num_workers = 1
master = ACNetwork() # Creates the network
envs = [gym.make('CartPole-v0') for i in range(num_workers)]
# the environment that gives me data to train on
workers = [Worker(number=i, environment=envs[i], master_network=master,
counter=counter) for i in range(num_workers)]
for worker in workers:
worker.start()
我的问题是我在父进程中创建主模型,将其作为参数传递给每个子进程,并且获取权重会引发值错误:
# The worker executes
def run(self):
with tf.Session(graph=tf.Graph()) as sess:
self.private_net = ACNetwork()
.......
a_grads, c_grads = self.private_net.get_gradients()
self.master.update_from_gradients(a_grads, c_grads)
# now inside the master network
def update_from_gradients(self, actor_gradients, critic_gradients):
grads_and_vars = list(zip(actor_gradients, self.actor_t.get_weights()))
# get_weights raises the error
train_op = self.actor_opt.apply_gradients(grads_and_vars)
提高:
ValueError: Tensor Tensor("dense_4/kernel:0", shape=(4, 512), dtype=float32_ref) is not an element of this graph
根据我的理解,更新权重的代码是在不需要主网络的图形中执行的,因此会引发错误。如何保留主网络的图形,以便我可以在子进程的上下文中更新它?
您必须在每个流程中创建模型并设置相等的权重,并在不关闭流程的情况下将模型保留在那里。您需要控制它们之间的列车流,使进程等待主线程,反之亦然。这可能太难了,还有其他选择。
你不需要流程来传递并行批处理,你可以创建一个并行模型:
mainModel = ....
inputs = []
outputs = []
for i in range(num_workers):
inp = Input(input_shape)
out = mainModel(inp)
inputs.append(inp)
outputs.append(out)
parallelModel = Model(inputs, outputs)
使用num_workers
不同的数据组进行训练:
parallelModel.fit(
[xTrain1, xTrain2,...],
[yTrain1, yTrain2,...]
)
如果您使用的是预先执行,即使没有并行模型或其他过程,您也可以传递num_workers
批次,计算它们的梯度,对它们的梯度求和,最后应用梯度。
如果您确实想在不创建建议的并行模型的情况下使用并行处理,则可能应该使用multiprocessing.dummy
并在主线程中保留一个模型,仅使用 worker 来传递数据和获取梯度。
现在,更简单的是,使用num_workers
倍于每个并行批次大小的批大小将产生相同的结果。