如何向多线程函数发出可以停止的信号



我有一个文件路径列表,我正在对每个路径运行一个函数(称为process_notebook()(。我认为,该职能部门采取的行动并不特别重要。的重要之处在于,如果某些依赖项(总是针对其他文件运行相同的函数(没有得到满足,则它必须延迟,这是通过返回False来发出的信号,如果它运行时没有延迟,则返回True。目前,我正在做这个:


last_count = len(deferred_notebooks)
while len(deferred_notebooks) > 0:
for nb in deferred_notebooks:
if process_notebook(nb, True):
deferred_notebooks.remove(nb)
# This should clear min one deferral each iteration. If it doesn't, then some dependency
# is impossible to meet and we should give up.
if len(deferred_notebooks) == last_count:
break

这很好,但当然会按顺序处理笔记本,在许多情况下,它们不会在几次迭代中运行(例如,在依赖关系链中(,这意味着它有点慢。我想通过像这样的多处理来加快速度

with ThreadPoolExecutor(max_workers=25) as executor:
for nb in deferred_notebooks:
executor.submit(process_notebook, nb, True)

在这种情况下,process_notebook()应该能够只调用executor.submit(),并在不满足其依赖关系的情况下将自己重新添加到队列中。

我不知道如何复制这个部分:

# This should clear min one deferral each iteration. If it doesn't, then some dependency
# is impossible to meet and we should give up.
if len(deferred_notebooks) == last_count:
break

我如何向函数发出信号,表示它需要放弃尝试,因为有一些不可能的依赖项没有清除?

如果我理解正确,您已经将算法从批量串行处理更改为连续并发处理。因此,你在每一批结束时都失去了检查点,你可以说,";如果每个处理都失败,则中止">

一种选择是批量并行处理,类似于

def process_item(item):
# return True if processed successfully, False otherwise
with ThreadPoolExecutor(...) as executor:
while len(items) > 0:
failed = []
for (item, ok) in zip(items, executor.map(process_item, items)):
if not ok:
failed.append(item)
if len(failed) == len(items):  # all failed
break
items = failed

最新更新