我想使用多个进程(not threads
(进行一些预处理并将结果排队到tf。RandomShuffleQueue,可以由我的主图用于训练。
有没有办法做到这一点?
我的实际问题
我已经将我的数据集转换为 TFRecord,分为 256 个分片。我想使用 multiprocessing
启动 20 个进程,并让每个进程都有一系列分片。每个进程都应该读取图像,然后增强它们并将它们推送到一个tf.RandomShuffleQueue
中,从中可以将输入提供给图形进行训练。
有人建议我在tensorflow
中通过inception
的例子。然而,这是一个非常不同的情况,因为只有数据分片的读取是由多个线程(not processes
(完成的,而预处理(例如 - 增强(发生在主线程中。
(这旨在解决您的实际问题(
在另一个主题中,有人告诉你 Python 具有全局解释器锁 (GIL(,因此除非您使用多个进程,否则多核不会带来速度优势。
这可能就是促使你渴望使用multiprocessing
的原因。
但是,对于TF,Python通常仅用于构造"图"。实际执行发生在本机代码(或 GPU(中,其中 GIL 没有任何作用。
鉴于此,我建议简单地让 TF 使用多线程。这可以使用 intra_op_parallelism_threads
参数进行控制,例如:
with tf.Session(graph=graph,
config=tf.ConfigProto(allow_soft_placement=True,
intra_op_parallelism_threads=20)) as sess:
# ...
(旁注:如果你有一个,比如说,一个 2 CPU、32 核的系统,最好的论点很可能是 intra_op_parallelism_threads=16
,这取决于很多因素(
评论:TFRecords的腌制并不那么重要。 我可以传递包含分片 TFRecord 文件范围名称的列表列表。
我必须重新启动决策过程!
注释:我可以将其作为参数传递给 Pool.map((。
验证multiprocesing.Queue()
是否可以处理此问题。
张量函数的结果是一个Tensor object
。
请尝试以下操作:
tensor_object = func(TFRecord)
q = multiprocessing.Manager().Queue()
q.put(tensor_object)
data = q.get()
print(data)
评论:如何确保所有进程都排队到同一个队列?
这很简单enqueue
Pool.map(...
的结果完成 毕竟process
完了。
或者,我们可以enqueue
并行,queueing
来自所有processes
的数据。
但这样做取决于如上所述的泡菜数据。
例如:
import multiprocessing as mp
def func(filename):
TFRecord = read(filename)
tensor_obj = tf.func(TFRecord)
return tensor_obj
def main_Tensor(tensor_objs):
tf = # ... instantiat Tensor Session
rsq = tf.RandomShuffleQueue(...)
for t in tensor_objs:
rsq.enqueue(t)
if __name__ == '__main__':
sharded_TFRecords = ['file1', 'file2']
with mp.Pool(20) as pool:
tensor_objs = pool.map(func, sharded_TFRecords)
pool.join()
main_Tensor(tensor_objs)
推荐的TF
运行multiprocessing
方法是为每个孩子创建一个单独的tf.Session
,因为跨进程共享它是不可行的。
你可以看看这个例子,希望它有所帮助。
[编辑:旧答案]
您可以使用multiprocessing.Pool
并依靠其回调机制在结果准备就绪后立即将其放入tf.RandomShuffleQueue
中。
这是一个非常简单的示例,说明如何做到这一点。
from multiprocessing import Pool
class Processor(object):
def __init__(self, random_shuffle_queue):
self.queue = random_shuffle_queue
self.pool = Pool()
def schedule_task(self, task):
self.pool.apply_async(processing_function, args=[task], callback=self.task_done)
def task_done(self, results):
self.queue.enqueue(results)
这假设 Python 2,对于 Python 3,我建议使用 concurrent.futures.ProcessPoolExecutor
。