正在排队 tf.RandomShuffle使用多处理从多个进程排队



我想使用多个进程(not threads(进行一些预处理并将结果排队到tf。RandomShuffleQueue,可以由我的主图用于训练。

有没有办法做到这一点?

我的实际问题

我已经将我的数据集转换为 TFRecord,分为 256 个分片。我想使用 multiprocessing 启动 20 个进程,并让每个进程都有一系列分片。每个进程都应该读取图像,然后增强它们并将它们推送到一个tf.RandomShuffleQueue中,从中可以将输入提供给图形进行训练。

有人建议我在tensorflow中通过inception的例子。然而,这是一个非常不同的情况,因为只有数据分片的读取是由多个线程(not processes(完成的,而预处理(例如 - 增强(发生在主线程中。

(这旨在解决您的实际问题(

在另一个主题中,有人告诉你 Python 具有全局解释器锁 (GIL(,因此除非您使用多个进程,否则多核不会带来速度优势。

这可能就是促使你渴望使用multiprocessing的原因。

但是,对于TF,Python通常仅用于构造"图"。实际执行发生在本机代码(或 GPU(中,其中 GIL 没有任何作用。

鉴于此,我建议简单地让 TF 使用多线程。这可以使用 intra_op_parallelism_threads 参数进行控制,例如:

with tf.Session(graph=graph, 
    config=tf.ConfigProto(allow_soft_placement=True, 
    intra_op_parallelism_threads=20)) as sess:
    # ...

(旁注:如果你有一个,比如说,一个 2 CPU、32 核的系统,最好的论点很可能是 intra_op_parallelism_threads=16 ,这取决于很多因素(

评论:TFRecords的腌制并不那么重要。 我可以传递包含分片 TFRecord 文件范围名称的列表列表。

我必须重新启动决策过程!

注释:我可以将其作为参数传递给 Pool.map((。

验证multiprocesing.Queue()是否可以处理此问题。
张量函数的结果是一个Tensor object
请尝试以下操作:

tensor_object = func(TFRecord)
q = multiprocessing.Manager().Queue()
q.put(tensor_object)
data = q.get()
print(data)

评论:如何确保所有进程都排队到同一个队列?

这很简单enqueue Pool.map(...的结果完成 毕竟process完了。
或者,我们可以enqueue并行,queueing来自所有processes的数据。

但这样做取决于如上所述的泡菜数据。


例如:

import multiprocessing as mp
def func(filename):
    TFRecord = read(filename)
    tensor_obj = tf.func(TFRecord)
    return tensor_obj
def main_Tensor(tensor_objs):
    tf = # ... instantiat Tensor Session
    rsq = tf.RandomShuffleQueue(...)
    for t in tensor_objs:
        rsq.enqueue(t)
if __name__ == '__main__':
    sharded_TFRecords = ['file1', 'file2']
    with mp.Pool(20) as pool:
        tensor_objs = pool.map(func, sharded_TFRecords)
        pool.join()
    main_Tensor(tensor_objs)
似乎

推荐的TF运行multiprocessing方法是为每个孩子创建一个单独的tf.Session,因为跨进程共享它是不可行的。

你可以看看这个例子,希望它有所帮助。

[编辑:旧答案]

您可以使用multiprocessing.Pool并依靠其回调机制在结果准备就绪后立即将其放入tf.RandomShuffleQueue中。

这是一个非常简单的示例,说明如何做到这一点。

from multiprocessing import Pool

class Processor(object):
    def __init__(self, random_shuffle_queue):
        self.queue = random_shuffle_queue
        self.pool = Pool()
    def schedule_task(self, task):
        self.pool.apply_async(processing_function, args=[task], callback=self.task_done)
    def task_done(self, results):
        self.queue.enqueue(results)

这假设 Python 2,对于 Python 3,我建议使用 concurrent.futures.ProcessPoolExecutor

相关内容

  • 没有找到相关文章

最新更新