在使用多处理时设置Pool Map操作的时间限制?



是否可以在Python中使用multiprocessing时对pool map()操作设置时间限制?当达到时间限制时,所有子进程停止并返回它们已有的结果。

import multiprocessing as mp
def task(v):
do something
return result
if __name__ == '__main__':
vs = [...]
p= mp.Pool()
results = p.map(task, vs)

在上面的例子中,我有一个非常大的列表vs。理想情况下,列表vs中的所有元素将被发送到功能task()中,所有结果将保存在results列表中。

然而,由于列表vs非常大,我只有有限的时间来进行这个过程(假设5min)。我需要的是在达到5分钟时停止map进程,并将计算结果返回到列表results

EDIT1:

我不会终止一个需要超过5分钟才能完成的任务。假设我在列表vs中有1000个任务,并且只有600个任务在5分钟后完成。我需要的是杀死所有的子进程,并将这600个任务的结果保存到results列表中。

我看了看ForceBru提到的答案,它使用了一种叫做Pebble的东西。首先,我不理解关于"Python标准池不支持超时"的评论。它的方式是,您可以等待指定的时间来返回结果,并通过异常通知是否有,或者您可以在结果对象上发出等待,指定超时。在这种情况下不会返回异常,但是您可以测试"job"处理结果是否完成。但是,您确实不能终止单个超时作业。但是,当您处理完所有没有超时的结果时,您可以在池本身上调用terminate,这将终止池中的所有进程,无论它们是否空闲。这就导致了该回答中的第二个注释,"突然终止进程可能会导致应用程序中的奇怪行为"。这取决于超时的工作正在做什么。因此,我们同意,如果这样做可能导致奇怪的行为,我们不应该超时工作并过早地终止它们。但我看不出Pebble如何能更好地处理这个问题。

这个问题的答案实际上有一种做你想做的事的技巧。您需要放弃使用map函数,转而使用apply_async指定回调函数,以便在结果可用时保存结果。在下面的示例中,我使用5秒的TIMEOUT值只是为了演示,并且已经安排了提交超时的10个作业中的大约一半。我已经预先分配了一个名为squares的结果列表,它将保存10个结果,并且已经用10个None值初始化。当我们全部完成时,如果i值是None,这是因为正在处理值i的作业超时了。我的workder函数还返回它的参数v,以及它的计算值v ** 2,这样回调函数就知道计算结果应该在squares列表中的哪个位置:

import multiprocessing as mp
import time
def my_task(v):
time.sleep(v)
return v, v ** 2
squares = [None] * 10
def my_callback(t):
i, s = t
squares[i] = s

TIMEOUT = 5
if __name__ == '__main__':
vs = range(10)
pool = mp.Pool()
results = [pool.apply_async(my_task, args=(v,), callback=my_callback) for v in vs]
time.sleep(TIMEOUT)
pool.terminate() # all processes, busy or idle, will be terminated
print(squares)

打印:

[0, 1, 4, 9, 16, None, None, None, None, None]

第二个更复杂的方法不使用回调函数。相反,它对由调用pool.apply_async返回的每个AsynchResult实例执行get调用,并指定超时值。这里的棘手之处在于,对于初始调用,您必须使用完整的超时值。但是,当返回结果或获得超时异常时,您已经等待了一段时间,t。这意味着下次您获得超时结果时,您指定的超时值应该通过t:

减少。
import multiprocessing as mp
import time
def my_task(v):
time.sleep(6 if v == 0 else v)
return v ** 2

TIMEOUT = 5
if __name__ == '__main__':
vs = range(mp.cpu_count() - 1) # 7 on my desktop
pool = mp.Pool() # poolsize is 8
results = [pool.apply_async(my_task, args=(v,)) for v in vs]
time_to_wait = TIMEOUT # initial time to wait
start_time = time.time()
for i, result in enumerate(results):
try:
return_value = result.get(time_to_wait) # wait for up to time_to_wait seconds
except mp.TimeoutError:
print('Timeout for v = ', i)
else:
print(f'Return value for v = {i} is {return_value}')
# how much time has exprired since we began waiting?
t = time.time() - start_time
time_to_wait = TIMEOUT - t
if time_to_wait < 0:
time_to_wait = 0
pool.terminate() # all processes, busy or idle, will be terminated

打印:

Timeout for v =  0
Return value for v = 1 is 1
Return value for v = 2 is 4
Return value for v = 3 is 9
Return value for v = 4 is 16
Timeout for v =  5
Timeout for v =  6

注意

通过使用apply_async而不是map,作业被有效地提交,chunksize为1(参见mapchunksize参数,它决定了如何将可迭代参数分解为"chunks"将其放在每个进程的输入队列中,以最小化共享内存传输的数量。对于大型可迭代对象,apply_asyncmap效率低,后者使用了"reasonable"基于池大小和要处理的作业数量的默认块大小。

相关内容

  • 没有找到相关文章

最新更新