ThreadPoolExecutor和as_completed:它是否等待所有期货完成



我有一个用例,需要处理几个"任务";对象。这些任务对象基本上是网络爬虫。每当任务完成时,我都希望使用相同的参数尽快重新启动它。

这就是我现在拥有的:

tasks = create_collection_tasks()
with concurrent.futures.ThreadPoolExecutor(max_workers=len(tasks)) as executor:
# Maps the future to a collection task. The mapping is in this direction bc
# later on we can only iterate over the completed futures.
futures_to_tasks = {
executor.submit(task.callback, task.data): task for task in tasks
}
while futures_to_tasks:
for future in concurrent.futures.as_completed(futures_to_tasks):
# If the future raised an exception, calling result() will re-raise
try:
future.result()
except Exception:
logger.exception("Collection task failed")
# When the future is done (completed or crashed/cancelled), put it back
# in the 'queue' to re-run it
task = futures_to_tasks.pop(future)
futures_to_tasks[executor.submit(task.callback, task.data)] = task

我已经阅读了as_completed的文档:https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed然而,我仍然不清楚它的行为;快照";调用futures_to_tasks时处于CCD_3中的期货,并在完成循环之前等待所有这些期货完成,如果它只是遍历期货并产生已完成的期货,而不等待其他期货。我宁愿以后的行为。

你能帮我一下吗?

让我们假设您希望无限期地运行(这似乎隐含在您的问题中(。您可以考虑这种模式(解释如下代码(:

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def trun(v):
print(v)
sleep(0.5)
with ThreadPoolExecutor() as executor:
tasks = ['a', 'b', 'c', 'd', 'e']
futures = dict()
while True:
sleep(0.5)
for i, t in enumerate(tasks):
if (f := futures.get(i, None)) is not None:
if not f.running():
print(f'Thread {i} has finished. Awaiting result')
f.result()
print(f'Restarting thread {i}')
else:
print(f'Thread {i} still running')
continue
else:
print(f'Starting new thread {i}')
futures[i] = executor.submit(trun, t)

我们有许多任务由任务列表中的字符串表示。

我们构造了一个字典,其中的键将是任务的索引,关联的值将是Future对象。在这种特殊情况下,我们可以使用列表中的值作为键,但这在现实世界中可能不合适。

有一些硬编码的延迟只是为了模拟实际工作。您需要考虑线程在实际中运行的时间,以确定是否需要生产延迟(以避免循环中的抖动(。

枚举任务并检查字典中是否有对任务id(列表索引(的引用。如果有,请检查它是否仍在运行。如果有,则获取结果。

其他一切都应该不言自明。

此代码是可运行的,但您需要手动停止它(Ctrl-C将是理想的(

您可以考虑使用某种形式的sentinel,它将在线程中设置以指示完成,并且可以在主循环中进行检查。然而,如果处理得太天真,可能会出现竞争情况

最新更新