我有一个要使用多处理的元素列表。问题是,对于某些特定的输入(在尝试之前无法观察(,这是我功能失速的一部分。我在概念上使用下面的代码显示了这一点,函数sometimes_stalling_processing()
有时会无限期地停滞。
要将其放在上下文中,我正在使用Web刮板处理一堆链接,即使在请求模块中使用超时,其中一些链接也会停滞。我尝试了不同的方法(例如使用eventlet
(,但得出的结论是,在多处理级别上处理它也许更容易。
def stable_processing(obs):
...
return processed_obs
def sometimes_stalling_processing(obs):
...
return processed_obs
def extract_info(obs):
new_obs = stable_processing(obs)
try:
new_obs = sometimes_stalling_processing(obs)
except MyTimedOutError: # error doesn't exist, just here for conceptual purposes
pass
return new_obs
pool = Pool(processes=n_threads)
processed_dataset = pool.map(extract_info, dataset)
pool.close()
pool.join()
这个问题(我如何在暂停之后中止多处理中的任务?(似乎非常相似,但是我无法将其转换为使用map
而不是apply
。我还尝试使用eventlet
软件包,但这不起作用。请注意,我正在使用Python 2.7。
如何在单个观察值上进行pool.map()
超时并杀死sometimes_stalling_processing
?
您可以查看卵石库。
from pebble import ProcessPool
from concurrent.futures import TimeoutError
def sometimes_stalling_processing(obs):
...
return processed_obs
with ProcessPool() as pool:
future = pool.map(sometimes_stalling_processing, dataset, timeout=10)
iterator = future.result()
while True:
try:
result = next(iterator)
except StopIteration:
break
except TimeoutError as error:
print("function took longer than %d seconds" % error.args[1])
文档中的更多示例。