我写了这本简短的POC,以帮助了解我所遇到的问题,希望有人可以向我解释发生了什么以及如何修复和/或使其更有效。
我使用迭代器,Itertools和Generator的目标是因为我不想在内存中存储一个巨大的列表,因为我扩大列表将变得难以管理,而且我不想在整个列表上循环循环每次做某事。请注意,我对生成器,迭代器和多处理的想法是相当陌生的,并今天编写了此代码,因此,如果您可以清楚地告诉我我很想念有关这些事情如何工作的工作流程,请教育我并帮助我更好的代码。
您应该能够按原样运行代码,并查看我面临的问题。我期待一旦抓住例外,它就会提高并死去,但是我看到的是发生的,例外被抓住了,但其他过程仍在继续。
如果我评论generateRange
生成器并创建虚拟列表并将其传递到futures = (map(executor.submit, itertools.repeat(execute), mylist))
中,则确实会捕获异常并按照预期退出脚本。
我的猜测是,生成器/迭代器必须在脚本死亡之前完成生成范围,据我所知,这并不是这样。
我选择使用发电机函数/迭代器的原因是因为您只能在需要时访问它们。
我是否有一种方法可以阻止发电机继续进行,并适当提高异常。
这是我的POC:
import concurrent.futures
PRIMES = [0]*80
import time
def is_prime(n):
print("Enter")
time.sleep(5)
print("End")
1/0
child = []
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
for i in PRIMES:
child.append(executor.submit(is_prime, i))
for future in concurrent.futures.as_completed(child):
if future.exception() is not None:
print("Throw an exception")
raise future.exception()
if __name__ == '__main__':
main()
编辑:我用更简单的内容更新了POC。
不可能立即取消运行期货,但这至少使它仅在增加例外后运行了几个过程:
import concurrent.futures
PRIMES = [0]*80
import time
def is_prime(n):
print("Enter")
time.sleep(5)
print("End")
1/0
child = []
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
for i in PRIMES:
child.append(executor.submit(is_prime, i))
for future in concurrent.futures.as_completed(child):
if future.exception() is not None:
for fut in child:
fut.cancel()
print("Throw an exception")
raise future.exception()
if __name__ == '__main__':
main()