我的结构(大量简化)如下所示:
import multiprocessing
def creator():
# creates files
return
def relocator():
# moves created files
return
create = multiprocessing.Process(target=creator)
relocate = multiprocessing.Process(target=relocator)
create.start()
relocate.start()
我正在尝试做的是让creator
创建一堆文件,并在创建后立即将它们移动到另一个目录relocator
.
我想在这里使用multiprocessing
的原因是:
- 我不希望
creator
先等搬家完成,因为搬家需要时间,我不想浪费。
在 - 开始复制之前先创建所有文件也不是一种选择,因为驱动器中没有足够的空间容纳所有文件。
我希望creator
和relocator
进程都是串行的(每个进程一次一个文件),但并行运行。操作的"日志"应该像这样:
# creating file 1
# creating file 2 and relocating file 1
# creating file 3 and relocating file 2
# ...
# relocating last file
根据我所读到的内容,Queue
是要走的路。
策略:(也许不是最好的?!
创建文件后,它将进入队列,在完成重新定位后,它将从队列中删除。
但是我在编码时遇到了问题; 同时创建多个文件(并行运行的多个creator
实例)和其他文件......
我将非常感谢任何想法,提示,解释等
让我们接受您的想法并拆分为此功能:
创建者应创建文件(例如 100 个)
重新定位器应一次移动 1 个文件,直到没有更多要移动的文件
创建者可以在重新定位器之前结束,因此它也可以 把自己变成一个搬迁者两者都必须知道什么时候 完成
因此,我们有 2 个主要功能:
def create(i):
# creates files and return outpath
return os.path.join("some/path/based/on/stuff", "{}.ext".format(i))
def relocate(from, to):
# moves created files
shuttil.move(from, to)
现在让我们创建我们的流程:
from multiprocessing import Process, Queue
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
comm_q.put("STOP_FLAG") # we tell the workers when to stop, we just push one since we only have one more worker
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
creator_process= multiprocessing.Process(target=creator, args=(comm_queue))
relocators = multiprocessing.Process(target=relocator, args=(comm_queue))
creator_process.start()
relocators .start()
这样我们现在将拥有一个创建者和一个重新定位器,但是,假设现在我们希望创建者在创建工作完成时开始重新定位,我们可以只使用重新定位器,但我们需要再推一个"STOP_FLAG"
因为我们会有 2 个进程重新定位
def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
for _ in range(2):
comm_q.put("STOP_FLAG")
relocator(comm_q)
假设我们现在想要任意数量的重新定位器进程,我们应该稍微调整一下我们的代码来处理这个问题,我们需要creator
方法来知道有多少标志通知其他进程何时停止,我们的结果代码如下所示:
from multiprocessing import Process, Queue, cpu_count
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q, number_of_subprocesses):
for i in range(100):
comm_q.put(create(i))
for _ in range(number_of_subprocesses + 1): # we need to count ourselves
comm_q.put("STOP_FLAG")
relocator(comm_q)
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
num_of_cpus = cpu_count() #we will spam as many processes as cpu core we have
creator_process= Process(target=creator, args=(comm_queue, num_of_cpus))
relocators = [Process(target=relocator, args=(comm_queue)) for _ in num_of_cpus]
creator_process.start()
for rp in relocators:
rp.start()
然后,您将不得不等待它们完成:
creator_process.join()
for rp in relocators:
rp.join()
您可能需要查看multiprocessing.Queue
文档
特别是对get
方法(默认是阻塞调用)
从队列中删除并返回项目。如果可选的参数块是 True(默认值),超时为 None (默认值),如果 在项目可用之前是必需的。