在不等待所有任务完成的情况下处理池结果


from multiprocessing import Pool
from functools import partial
from time import sleep
import random
import string
import uuid
import os
import glob

def task_a(param1, param2, mydata):
thread_id = str(uuid.uuid4().hex)   # this may not be robust enough to guarantee no collisions, address
output_filename = ''.join([str(thread_id),'.txt'])
# part 1 - create output file for task_b to use
with open(output_filename, 'w') as outfile:
for line in mydata:
outfile.write(line)
# part 2 - do some extra stuff (whilst task_b is running)
sleep(5)
print('Task A finished')
return output_filename # not interested in return val

def task_b(expected_num_files):
processed_files = 0
while processed_files<expected_num_files:
print('I am task_b, waiting for {} files ({} so far)'.format(expected_num_files, processed_files))
path_to_search = ''
for filename in glob.iglob(path_to_search + '*.txt', recursive=True):
print('Got file : {}'.format(filename))
# would do something complicated here
os.rename(filename, filename+'.done')
processed_files+=1
sleep(10)

if __name__ == '__main__':
param1 = ''     # dummy variable, need to support in solution
param2 = ''     # dummy variable, need to support in solution
num_workers = 2
full_data = [[random.choice(string.ascii_lowercase) for _ in range(5)] for _ in range(100)]
print(full_data)
for i in range(0, len(full_data), num_workers):
print('Going to process {}'.format(full_data[i:i+num_workers]))
p = Pool(num_workers)
task_a_func = partial(task_a, param1, param2)
results = p.map(task_a_func, full_data[i:i+num_workers])
p.close()
p.join()
task_b(expected_num_files=num_workers) # want this running sooner
print('Iteration {} complete'.format(i))
#want to wait for task_a's and task_b to finish

我在安排这些任务同时运行时遇到问题。

taska是一个多处理池,它在执行的中途生成一个输出文件。

task_b必须按顺序处理输出文件可以是任何顺序(只要它们可用就可以(,WHILST task_a继续运行(它将不再更改输出文件(

只有当所有task_a都已完成且task_b已完成时,才能开始下一次迭代。

我发布的玩具代码显然在task_b启动之前等待task_a完全完成(这不是我想要的(

我已经研究了多处理/子进程等,但无法找到同时启动池和单个task_b进程并等待两者完成的方法

task_b的编写方式就好像它可以更改为外部脚本一样,但我仍然停留在如何管理执行上。

我是否应该有效地将代码从task_b合并到task_a中,并以某种方式传递一个标志,以确保每个池中有一个工作人员通过if/else"运行task_b代码"——至少这样我就可以等待池完成了?

您可以使用进程间队列在任务a和任务b之间通信文件名。

此外,在循环中重复初始化池是有害的,而且速度不必要地慢。最好一开始就初始化一次池。

from multiprocessing import Pool, Manager, Event
from functools import partial
from time import sleep
import random
import string
import uuid
import os
import glob

def task_a(param1, param2, queue, mydata):
thread_id = str(uuid.uuid4().hex)
output_filename = ''.join([str(thread_id),'.txt'])
output_filename = 'data/' + output_filename
with open(output_filename, 'w') as outfile:
for line in mydata:
outfile.write(line)
print(f'{thread_id}: Task A file write complete for data {mydata}')
queue.put(output_filename)
print('Task A finished')

def task_b(queue, num_workers, data_size, event_task_b_done):
print('Task b started!')
processed_files = 0
while True:
filename = queue.get()
if filename == 'QUIT':
# Whenever you want task_b to quit, just push 'quit' to the queue
print('Task b quitting')
break
print('Got file : {}'.format(filename))
os.rename(filename, filename+'.done')
processed_files+=1
print(f'Have processed {processed_files} so far!')
if (processed_files % num_workers == 0) or (processed_files ==  data_size):
event_task_b_done.set()

if __name__ == '__main__':
param1 = ''     # dummy variable, need to support in solution
param2 = ''     # dummy variable, need to support in solution

num_workers = 2
data_size = 100
full_data = [[random.choice(string.ascii_lowercase) for _ in range(5)] for _ in range(data_size)]
mgr = Manager()
queue = mgr.Queue()
event_task_b_done = mgr.Event()
# One extra worker for task b
p = Pool(num_workers + 1)
p.apply_async(task_b, args=(queue, num_workers, data_size, event_task_b_done))
task_a_func = partial(task_a, param1, param2, queue)
for i in range(0, len(full_data), num_workers):
data = full_data[i:i+num_workers]
print('Going to process {}'.format(data))
p.map_async(task_a_func, full_data[i:i+num_workers])
print(f'Waiting for task b to process all {num_workers} files...')
event_task_b_done.wait()
event_task_b_done.clear()
print('Iteration {} complete'.format(i))
queue.put('QUIT')
p.close()
p.join()
exit(0)

相关内容

  • 没有找到相关文章

最新更新