Python 执行器从完成回调生成任务(递归提交任务)



我正在尝试从已完成的任务的结果提交进一步的任务:

with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(my_task)
def callback(future):
for another_task in future.result():
future = executor.submit(another_task)
future.add_done_callback(callback)
future.add_done_callback(callback)

但我得到:

运行时错误:关闭后无法安排新的期货

让执行者保留回调的最佳方法是什么?信号量?

理想情况下,如果将ThreadPoolExecutor替换为ProcessPoolExecutor,则解决方案应很好地转移。

callback在单独的线程中执行。因此,当您的回调逻辑开始执行时,主循环很有可能会让上下文管理器关闭Executor。这就是为什么你观察RuntimeError.

最简单的解决方法是按顺序运行逻辑。

futures = []
with concurrent.futures.ThreadPoolExecutor() as pool:
futures.append(pool.submit(task))
while futures:
for future in concurrent.futures.as_completed(futures):
futures.remove(future)
for new_task in future.result():
futures.append(pool.submit(new_task))

请注意,此代码可能会导致向Executor呈指数级方式提交任务。

此解决方案保证在任何给定点最大化处理。sleep的使用不是那么优雅,但这是我迄今为止最好的。

with concurrent.futures.ThreadPoolExecutor() as executor:
pending_tasks = 1
future = executor.submit(my_task)
def callback(future):
nonlocal pending_tasks
for another_task in future.result():
pending_tasks += 1
future = executor.submit(another_task)
future.add_done_callback(callback)
pending_tasks -= 1
future.add_done_callback(callback)
while pending_tasks:
time.sleep(10)

相关内容

  • 没有找到相关文章

最新更新