在 Python 的 ThreadPoolExecutor 中重试失败的未来



我想用Python的concurrent.futures.ThreadPoolExecutor实现重试逻辑。我想要以下属性:

  1. 一旦工作队列失败,就会向其添加一个新的future
  2. 可以无限期或最大重试次数再次重试重试的将来

我在网上发现的许多现有代码基本上都在";轮次";,在那里,他们在期货的初始列表上调用as_completed,重新提交失败的期货,将这些期货收集在新列表中,如果新列表不为空,则返回调用新列表上的as_completed。基本上是这样的:

with concurrent.futures.ThreadPoolExecutor(...) as executor:
futures = {executor.submit(fn, job): job for job in jobs}
while len(futures) > 0:
new_futures = {}
for fut in concurrent.futures.as_completed(futures):
if fut.exception():
job = futures[fut]
new_futures[executor.submit(fn, job)] = job
else:
...  # logic to handle successful job
futures = new_futures

然而,我认为这并不满足第一个属性,因为重试的未来可能在初始未来之前完成,但在所有初始未来完成之前,我们不会处理它。

这是一个假设的病理病例。假设我们有两个作业,第一个作业运行1秒,但失败的几率为90%,而第二个作业运行100秒。如果我们的执行程序有两个工作者,而第一个作业在1秒后失败,我们将立即重试。但如果它再次失败,我们将无法重试,直到第二个作业完成。


所以我的问题是,在不使用外部库或重写低级执行器逻辑的情况下,是否可以实现具有这些所需属性的重试逻辑?我尝试过的一件事是在发送给工作人员的代码中放入重试逻辑:

def worker_job(fn):
try:
return fn()
except Exception:
executor.submit(fn)
with concurrent.futures.ThreadPoolExecutor(...) as executor:
jobs = [functools.partial(fn, arg) for arg in args]
executor.map(worker_job, jobs)

但从一份工作中提交新工作似乎是行不通的。

使用as_completed重试

简单的方式

使用wait(..., return_when=FIRST_COMPLETED)而不是as_completed(...)循环。

权衡:

  1. pending期货的开销(重新添加服务员,构建new_futures(
  2. 如果要指定整体timeout,则会出现问题
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(fn, job): job for job in jobs}
while len(futures) > 0:
new_futures = {}
done, pending = concurrent.futures.wait(futures, return_when=FIRST_COMPLETED)
for fut in done:
if fut.exception():
job = futures[fut]
new_futures[executor.submit(fn, job)] = job
else:
...  # logic to handle successful job
for fut in pending:
job = futures[fut]
new_futures[fut] = job
futures = new_futures

高效的方式

调整as_completed(...)以添加到fspending,并使用waiter

权衡:维护。

优点:如果需要,可以指定整体timeout

class AsCompletedWaiterWrapper:
def __init__(self):
self.fs = None
self.pending = None
self.waiter = None
def listen(self, fut):
with self.waiter.lock:
self.fs.add(fut)
self.pending.add(fut)
fut._waiters.append(self.waiter)
def as_completed(self, fs, timeout=None):
"""
concurrent.futures.as_completed plus the 3 lines marked with +.
"""
if timeout is not None:
end_time = timeout + time.monotonic()
fs = set(fs)
total_futures = len(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
self.fs = fs            # +
self.pending = pending  # +
self.waiter = waiter    # +
finished = list(finished)
try:
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs,))
while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.monotonic()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), total_futures))
waiter.event.wait(wait_timeout)
with waiter.lock:
finished = waiter.finished_futures
waiter.finished_futures = []
waiter.event.clear()
# reverse to keep finishing order
finished.reverse()
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs, pending))
finally:
# Remove waiter from unfinished futures
for f in fs:
with f._condition:
f._waiters.remove(waiter)

用法:

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(fn, job): job for job in jobs}
w = AsCompletedWaiterWrapper()
for fut in w.as_completed(futures):
if fut.exception():
job = futures[fut]
new_fut = executor.submit(fn, job)
futures[new_fut] = job
w.listen(new_fut)
else:
...  # logic to handle successful job

从作业助手重试

等待with ... executor:中的events,因为ThreadPoolExecutor.__exit__关闭了executor,因此无法安排新的未来。

权衡:

  1. 由于主进程中引用了executor,因此无法使用ProcessPoolExecutor
  2. 如果要指定整体超时,则会引发麻烦
def worker_job(fn, event):
try:
rv = fn()
event.set()
return rv
except Exception:
executor.submit(worker_job, fn, event)
with concurrent.futures.ThreadPoolExecutor() as executor:
jobs = [functools.partial(fn, arg) for arg in args]
events = [threading.Event() for _ in range(len(jobs))]
executor.map(worker_job, jobs, events)
for e in events:
e.wait()

你说:

但如果再次失败,我们将无法重试,直到第二个作业完成。

但我不认为这是真的。代码上写着:

for fut in concurrent.futures.as_completed(futures):

这有点微妙,因为futures不是用作字典,而是用作迭代器,特别是用于检查是否完成的期货迭代器。这个迭代器提供了一组期货来检查是否完成,然后as_completed()在它们完成时生成它们。因此,在您的病理情况下,1s作业确实可以在100s作业完成之前重试多次。

最新更新