我创建了这个示例代码,试图解释我的问题是关于:
from concurrent.futures import ThreadPoolExecutor
def my_method(x):
if x == 2:
print("error")
raise Exception("Exception raised")
else:
print("ok")
with ThreadPoolExecutor() as executor:
executor.map(my_method, [1, 2, 3])
我有一个类似的代码在我的应用程序,但是,而不是打印消息,它只是调用AWS Lambda函数。问题是,在my_method
函数中调用lambda之前的一些行可能会抛出异常。如果我不需要读取executor.map
方法的结果,代码就不会抛出异常,它会完成它能完成的所有工作。在本例中,输出为:
> ok
> error
> ok
我知道如果我这样做:
with ThreadPoolExecutor() as executor:
for r in executor.map(my_method, [1, 2, 3]):
print(r)
这几行将抛出一个错误,并不是所有的任务都将执行。在我的例子中,并不是所有的AWS lambda函数都会被触发。但是如果我像第一个例子一样使用execute方法,我的代码将尽可能地触发所有lambda。这样做安全吗?
我问这个是因为它可能是一些内存泄漏,或者我可能不知道的东西。
这种情况下的最佳实践是什么?
你不会得到内存泄漏,这不是Python会做的事情。但是,我建议不要这样使用Executor.map
。当您不使用map迭代器时,它可以工作的原因是结果被缓冲以供其使用。由于迭代器在遇到异常时明显停止,因此预期的行为是停止执行。我不会指望任何特定的行为来打开工作项。
我看到两种可能的方法:
-
在映射函数中捕获异常。看起来你想忽略这个错误,为什么不这样做呢?
-
切换到
Executor.submit
,可以更好地处理异常。
futures = [executor.submit(my_method, arg) for arg in [1, 2, 3]]
# wait until done. Return exception or None
exceptions = [future.exception() for future in futures]
# keep only actual exceptions
exceptions = [exception for exception in exceptions if exception is not None]
查看源代码,executor.map
是这样实现的:
fs = [self.submit(fn, *args) for args in zip(*iterables)]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield _result_or_cancel(fs.pop())
else:
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
return result_iterator()
基本上和我做的一样。请注意,一旦遇到异常,所有剩余的期货都将被取消。