与多处理队列作斗争



我的结构(大量简化)如下所示:

import multiprocessing
def creator():
# creates files
return

def relocator():
# moves created files
return

create = multiprocessing.Process(target=creator)
relocate = multiprocessing.Process(target=relocator)
create.start()
relocate.start()

我正在尝试做的是让creator创建一堆文件,并在创建后立即将它们移动到另一个目录relocator.

我想在这里使用multiprocessing的原因是:

  • 我不希望creator先等搬家完成,因为搬家需要时间,我不想浪费。
  • 开始复制之前先创建所有文件也不是一种选择,因为驱动器中没有足够的空间容纳所有文件。

我希望creatorrelocator进程都是串行的(每个进程一次一个文件),但并行运行。操作的"日志"应该像这样:

# creating file 1
# creating file 2 and relocating file 1
# creating file 3 and relocating file 2
# ...
# relocating last file

根据我所读到的内容,Queue是要走的路。

策略:(也许不是最好的?!

创建文件后,它将进入队列,在完成重新定位后,它将从队列中删除。

但是我在编码时遇到了问题; 同时创建多个文件(并行运行的多个creator实例)和其他文件......

我将非常感谢任何想法,提示,解释等

让我们接受您的想法并拆分为此功能:

创建
  1. 者应创建文件(例如 100 个)

  2. 重新定位器应一次移动 1 个文件,直到没有更多要移动的文件

  3. 创建者可以在重新定位器之前结束,因此它也可以 把自己变成一个搬迁者两者都必须知道什么时候 完成

因此,我们有 2 个主要功能:

def create(i):
# creates files and return outpath
return os.path.join("some/path/based/on/stuff", "{}.ext".format(i))

def relocate(from, to):
# moves created files
shuttil.move(from, to)

现在让我们创建我们的流程:

from multiprocessing import Process, Queue
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
comm_q.put("STOP_FLAG") # we tell the workers when to stop, we just push one since we only have one more worker
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
creator_process= multiprocessing.Process(target=creator, args=(comm_queue))
relocators = multiprocessing.Process(target=relocator, args=(comm_queue))
creator_process.start()
relocators .start()

这样我们现在将拥有一个创建者和一个重新定位器,但是,假设现在我们希望创建者在创建工作完成时开始重新定位,我们可以只使用重新定位器,但我们需要再推一个"STOP_FLAG"因为我们会有 2 个进程重新定位

def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
for _ in range(2):
comm_q.put("STOP_FLAG")
relocator(comm_q)

假设我们现在想要任意数量的重新定位器进程,我们应该稍微调整一下我们的代码来处理这个问题,我们需要creator方法来知道有多少标志通知其他进程何时停止,我们的结果代码如下所示:

from multiprocessing import Process, Queue, cpu_count
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q, number_of_subprocesses):
for i in range(100):
comm_q.put(create(i))
for _ in range(number_of_subprocesses + 1): # we need to count ourselves
comm_q.put("STOP_FLAG")
relocator(comm_q)
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
num_of_cpus = cpu_count() #we will spam as many processes as cpu core we have
creator_process= Process(target=creator, args=(comm_queue, num_of_cpus))
relocators = [Process(target=relocator, args=(comm_queue)) for _ in num_of_cpus]
creator_process.start()
for rp in relocators:
rp.start()

然后,您将不得不等待它们完成:

creator_process.join()
for rp in relocators:
rp.join()

您可能需要查看multiprocessing.Queue文档

特别是对get方法(默认是阻塞调用)

从队列中删除并返回项目。如果可选的参数块是 True(默认值),超时为 None (默认值),如果 在项目可用之前是必需的。

相关内容

  • 没有找到相关文章

最新更新