从多处理中的异常返回值



我正在调用一个函数task(url, param1, param2),该函数返回对 url =url的 API 调用的结果,或者如果 API 调用不起作用,则返回url名称。我的task看起来像这样:

def task(url, param1, param2):
try:
make_api_call(url, param1, param2)
except ValueError as e:
print("val error")            
return url

现在我想将task应用于包含 100 个 url 的列表,并开始将它们multiprocessing为:

import multiprocessing as mp
def run_tasks(urls, param1, param2):
jobs = []
for i in range(len(urls)):
process = mp.Process(target=task, args=(urls[i], param1, param2))
jobs.append(process)
## catch error processes
error_urls = []
## start processes
for j in jobs:
j.start()
## finish processes
for j in jobs:
j.join()

从上面的run_tasks,我将如何返回给我一个ValueErrorurl的列表?我试过error_urls.append(j.join()),但这不起作用。

有两种方法可以从过程中获得结果。

方法 1.使用Manager中的list。无需使用锁定在进程之间同步。

from multiprocessing import Process, Manager
def task(url, param1, param2, error_list):
try:
make_api_call(url, param1, param2)
except ValueError as e:
print("val error")            
error_list.append(url)
def run_tasks(urls, param1, param2):
error_list = Manager().list()    
jobs = []
for i in range(len(urls)):
process = Process(target=task, args=(urls[i], param1, param2, error_list))
jobs.append(process)
## start processes
for j in jobs:
j.start()
## finish processes
for j in jobs:
j.join()

方法 2.使用concurrent.futures中的ProcessPoolExecutor。此方法易于理解且代码较少。

from concurrent import futures
def task(url, param1, param2):
try:
make_api_call(url, param1, param2)
except ValueError as e:
print("val error")            
return url
def runt_tasks(urls, param1, param2):
with futures.ProcessPoolExecutor() as executor:
result = executor.map(task, urls, [param1] * len(urls), [param2] * len(urls))
error_list = [item for item in result if item is not None]

最后,从问题的描述。这是一个 IO 敏感问题。我建议您使用ThreadPoolExecutor.执行 IO 操作时,线程将释放 GIL 以允许其他线程运行。对于 CPU 敏感问题,最好使用ProcessPoolExecutor.asyncio是在 Python 3 中进行并发编程的另一种选择。

尝试共享内存。 使用此multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *args[, lock])

您可以在 中定义run_tasks

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Array
lock = Lock()
error_urls = Array(c_char_p, [], lock = lock)

def task(url, param1, param2):
try:
make_api_call(url, param1, param2)
except ValueError as e:
print("val error")            
error_urls.append(url)

作为 Array(( 的文档:

与 RawArray(( 相同,只是取决于锁的值 a 可以返回进程安全的同步包装器,而不是 原始 ctypes 数组。

因此,它是过程安全的。 更多关于 Array(( 可以参考这个,关于 ctypes(c_char_p(参考这个

相关内容

  • 没有找到相关文章