我正在调用一个函数task(url, param1, param2)
,该函数返回对 url =url
的 API 调用的结果,或者如果 API 调用不起作用,则返回url
名称。我的task
看起来像这样:
def task(url, param1, param2):
try:
make_api_call(url, param1, param2)
except ValueError as e:
print("val error")
return url
现在我想将task
应用于包含 100 个 url 的列表,并开始将它们multiprocessing
为:
import multiprocessing as mp
def run_tasks(urls, param1, param2):
jobs = []
for i in range(len(urls)):
process = mp.Process(target=task, args=(urls[i], param1, param2))
jobs.append(process)
## catch error processes
error_urls = []
## start processes
for j in jobs:
j.start()
## finish processes
for j in jobs:
j.join()
从上面的run_tasks
,我将如何返回给我一个ValueError
的url
的列表?我试过error_urls.append(j.join())
,但这不起作用。
有两种方法可以从过程中获得结果。
方法 1.使用Manager
中的list
。无需使用锁定在进程之间同步。
from multiprocessing import Process, Manager
def task(url, param1, param2, error_list):
try:
make_api_call(url, param1, param2)
except ValueError as e:
print("val error")
error_list.append(url)
def run_tasks(urls, param1, param2):
error_list = Manager().list()
jobs = []
for i in range(len(urls)):
process = Process(target=task, args=(urls[i], param1, param2, error_list))
jobs.append(process)
## start processes
for j in jobs:
j.start()
## finish processes
for j in jobs:
j.join()
方法 2.使用concurrent.futures
中的ProcessPoolExecutor
。此方法易于理解且代码较少。
from concurrent import futures
def task(url, param1, param2):
try:
make_api_call(url, param1, param2)
except ValueError as e:
print("val error")
return url
def runt_tasks(urls, param1, param2):
with futures.ProcessPoolExecutor() as executor:
result = executor.map(task, urls, [param1] * len(urls), [param2] * len(urls))
error_list = [item for item in result if item is not None]
最后,从问题的描述。这是一个 IO 敏感问题。我建议您使用ThreadPoolExecutor
.执行 IO 操作时,线程将释放 GIL 以允许其他线程运行。对于 CPU 敏感问题,最好使用ProcessPoolExecutor
.asyncio
是在 Python 3 中进行并发编程的另一种选择。
尝试共享内存。 使用此multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *args[, lock])
您可以在 中定义run_tasks
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Array
lock = Lock()
error_urls = Array(c_char_p, [], lock = lock)
和
def task(url, param1, param2):
try:
make_api_call(url, param1, param2)
except ValueError as e:
print("val error")
error_urls.append(url)
作为 Array(( 的文档:
与 RawArray(( 相同,只是取决于锁的值 a 可以返回进程安全的同步包装器,而不是 原始 ctypes 数组。
因此,它是过程安全的。 更多关于 Array(( 可以参考这个,关于 ctypes(c_char_p(参考这个