如何忽略任务失败的工作人员并将其任务重新分配给其他工作人员?



我正在使用client.map在N个单线程工作线程池(在N台机器上)上运行一个函数,其中一个工作线程失败了。我想知道是否有办法自动处理工作人员提出的异常,将其失败的任务重新分配给其他工作人员,并将其从池中忽略或排除?

我尝试使用下面显示的方法模拟问题。为了导致一个工作线程失败,我在my_function中对其提出了一个OSError,它提交给client.map如下:futures = client.map(my_function, range(100))。在我的示例中,"Computer123"上的工作线程将是失败的。为了处理my_function抛出的异常,我在exception_handler中使用了sys.exit。因此,当工作线程上的任务失败时,将调用 sys.exit。结果是,不良工作线程的 distributed.nanny 捕获故障并重新启动工作线程,同时客户端重新分发其失败的任务。但是,一旦坏工作线程再次备份,它就会再次接收任务,因为它仍在池中。它再次失败,并且该过程重复。当它继续失败时,最终其他工人完成了所有任务。如果我能自动处理来自"Computer123"等不良工人的异常并将其从池中删除,那将是理想的选择。也许我需要做的只是将其从游泳池中移除?

@exception_handler
def my_function(x):
import socket 
import time
time.sleep(5)
if socket.gethostname() == 'Computer123':
raise(OSError)
else:
return x**2
def exception_handler(orig_func):
def wrapper(*args,**kwargs):
try:
return orig_func(*args,**kwargs)
except:
import sys
sys.exit(1)
return wrapper

作为一种解决方法,您可以保留一个坏工人的字典,每次确定它是坏的时(也许在它引发一定数量的异常之后)都会添加主机名。

然后,当您要发出某些任务时,请检查它是否在违规列表中。像这样:

if socket.gethostname() in badHosts:
skip
else:
do_something()

如果您能提供有关如何管理所连接的池的更多详细信息,我可能会提供更多有关如何直接删除它们的建议,而不必每次都检查。

最新更新