TensorFlow:如何在训练期间多次评估验证数据队列



tl;博士

如何在每次 K 次训练迭代后评估验证集,对训练和验证数据使用单独的队列,而无需在多个进程中诉诸单独的tf.Sessions?鉴于我的特定问题,似乎没有一种干净的方法来实现这一目标,并且我当前的解决方法(我认为会起作用)给了我未定义的行为。帮助!

整个故事

我想每 K 次训练迭代评估一个验证集,但我无法弄清楚如何在 TensorFlow 中正确实现这一点。这应该是最常见的操作之一,但它感觉 TensorFlow 的 API/架构在这里对我不利,或者至少让事情变得不必要的困难。

我的假设是:

  • [A1] 此处描述的用于训练/验证的多进程模型 https://www.tensorflow.org/how_tos/reading_data/#multiple_input_pipelines 不适用于我的问题,因为我必须假设没有足够的 GPU 内存可用于加载变量两次。
  • [A2] 我想在验证集上评估每 K 次训练迭代。
  • [A3] 训练和验证数据不能简单地从磁盘读取,而是动态生成的。这使得无法提前可靠地预先计算验证集的大小。
  • [A4] 验证集太大,无法预先计算并存储到磁盘上。
  • [A5] 有效验证集大小不一定是批大小的倍数。

训练输入管道的设置如下:

  • tf.train.slice_input_producer()生成一个(随机的)文件名列表,每个文件名都引用原始输入数据。
  • 自定义数据生成函数从每个原始输入数据块生成可变数量的训练示例/标签。
  • 生成的训练样本/标签在输入网络之前通过tf.train.shuffle_batch()排队。

由于 [A3]、[A4]、[A5],验证输入管道的设置方式几乎相同,只是最终的输入队列是通过tf.train.batch()生成的,因为不需要洗牌。由于上述假设,基于feed_dict的方法也是不可行的,并且似乎与使用更高级别的功能(如tf.train.batch)不兼容。

但是,使用两组不同的队列进行训练和验证的简单实现不起作用。据我了解,我有两个选择:

  • [B1] 将验证tf.train.slice_input_producernum_epochs参数设置为None

    在这种情况下,验证集会无休止地循环,但我需要提前知道验证集的大小,以明确限制每次通过验证集要评估的批次数。此外,如果验证集大小不能被批大小整除,我总是会在最后一批中拉动更多。由于每次都会改变验证数据的评估顺序,因此这是不可接受的。

  • [B2] 将验证tf.train.slice_input_producernum_epochs参数设置为1,另外将tf.train.batch函数的allow_smaller_final_batch参数设置为True

    在这种情况下,验证集只循环一次,之后相应的队列将永久关闭。默认情况下,这将使评估验证集两次或更多次变得不可能。由于我不知道在 TensorFlow 中重新打开队列的好方法,我需要解决此限制。

由于选项 [B1] 的局限性更大,我选择解决选项 [B2] 的问题。概述我当前方法的(伪)代码如下:

训练循环应该是相当规范的。每 K 次迭代,就会调用一个用于评估验证集的函数。 请注意,我只启动名称以"train_"开头的队列;这些是为收集生成的训练数据而设置的队列。为此,我创建了两个帮助程序函数,get_queues_by_namestart_queue_runners

def train_loop(train_ops, vali_ops, ...):
with tf.Session() as sess:
coord = tf.train.Coordinator()
sess.run([tf.initialize_all_variables(), tf.initialize_local_variables()])
load_latest_snapshot(sess, loader, snapshot_file)
# Launch the queue runners
queues = get_queues_by_name("train")
threads = start_queue_runners(sess, coord, queues)
try:
for step in range(start_iteration, num_train_iterations):
# Runs the session on validation set
if step % K == 0:
validation_results = run_validation(vali_ops, snapshot_file)
# TRAINING:
# ...
except Exception as e:
coord.request_stop(e)
finally:
coord.request_stop()
coord.join(threads)

帮助程序函数如下所示:

def get_queues_by_name(name):
"""Retrieves all queues that contain the string given by 'name'"""
all_queues = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
return [q for q in all_queues if name in q.name]

def start_queue_runners(session, coordinator, queues):
"""Similar to tf.train.start_queue_runners but now accepts a list of queues instead of a graph collection"""
with session.graph.as_default():
threads = []
for queue in queues:
log("Queue", "Starting queue '%s'" % queue.name, level=2)
threads.extend(queue.create_threads(session, coordinator, daemon=True, start=True))
return threads

run_validation函数中,针对关闭队列的问题,我选择的解决方法是创建一个新tf.Session。我也只启动与收集验证集数据的队列关联的线程。

def run_validation(ops, snapshot_file):  # Called inside train_loop()
results = None
loader = tf.train.Saver()
with tf.Session() as sess:
coord = tf.train.Coordinator()
sess.run([tf.initialize_local_variables()])
load_latest_snapshot(sess, loader, snapshot_file)
# Launch the queue runners
queues = get_queues_by_name("eval")
threads = start_queue_runners(sess, coord, queues)
# Performs the inference in batches
try:
# Evaluate validation set:
results = eval_in_batches(ops, sess)
except Exception as e:
coord.request_stop(e)
finally:
coord.request_stop()
coord.join(threads)
return results

我不知道在这里创建新tf.Session是否是一个好主意,但这似乎是完成重新启动验证队列的唯一方法。理想情况下,我也不想重新加载模型快照,因为这在概念上似乎是不必要的。

这段代码的问题在于,我在运行过程中看到不稳定/未定义的行为,例如在验证集评估期间,NaN 或 Inf 出现在网络内部。这似乎主要发生在验证集队列被填充的同时,训练集队列仍在填充时(因为在验证集评估期间训练队列处于打开状态)。例如,如果我在迭代 0 处评估验证集(当两个队列仍需要填充时),通常会发生这种情况。训练/验证队列似乎共享某种全局状态,尽管它们在不同的会话中运行。

有人可以解释为什么会发生这种情况,以及如何在考虑上述假设 [A1]-[A5] 的同时更干净地解决这个问题?

我目前面临类似的问题。到目前为止,我完全避免了任何队列,只是通过feed_dict输入数据,但我显然因为不使用队列和并行性而失去了一些性能(尽管我仍然对当前的速度感到满意,因为我之前在 Theano 中做了同样的事情)。现在我想重新设计它并使用队列并偶然发现这个问题。有这个,这个,这个相关的问题。

我目前正在考虑这样做:

  • 在训练中,我想使用一个RandomShuffleQueue,这使得它更加复杂。我想我会忽略这个问题,一旦将张量排队进入队列的读取器线程完成,我就会让训练停止,所以我会丢失这个纪元剩余的最多capacity个项目,只把它用于下一个纪元。也许为了使其具有确定性,我检查了我仍然从队列中读取的训练线程,直到只剩下min_after_dequeue个项目。

  • 在评估中,我想使用相同的图形和相同的会话。我可以使用tf.cond从另一个单独的队列而不是RandomShuffleQueue读取。或者我可以在评估中使用feed_dict。如果我使用单独的队列,我会使用FIFOQueue并仔细跟踪我是否执行了正确数量的步骤。我还可以引入另一个虚拟张量,我将其排队到队列中,它给了我一个end_of_epoch标志左右,这样我就可以在 eval-thread 中知道何时停止。


在 TensorFlow 1.2 中,将有tf.contrib.data接口(问题注释、文档概述、API 文档),它提供了tf.contrib.data.DatasetAPI,该接口还支持类似于tf.RandomShuffleQueue的洗牌和批处理以及多个纪元的循环。此外,您可以通过在数据上创建迭代器来访问数据,并且可以重置迭代器。一些相关的StackOverflow问题在这里和这里。

最新更新