如何使用fastapi方法在python中并行运行具有不同参数的两个函数?



我有两个函数接受不同的参数作为输入,我正在寻找一种方法来并行运行它们一次,两个函数返回的东西,我想得到的结果也回来,

def func1(arg1),
return arg2
def func1(arg2):
return args

我试过使用,ray,但它比顺序执行要花很多时间,

import ray
ray.init()
@ray.remote
a = func1.remote([1,2,3])
b = func2.remote([4,5,6])
func1_results, func2_results = ray.get([a, b])

我知道我们可以使用多处理,但是我如何存储返回的结果,这是正确的方法吗?另外,我如何为每个函数分别传递参数?

我从另一个答案中得到这个例子,

from multiprocessing import Process

p1 = Process(target=method1) # create a process object p1
p1.start()                   # starts the process p1
p2 = Process(target=method2)
p2.start()

注意:Multiprocessing不能直接在fastapi方法中工作

为了简单起见,您可以使用线程,但如果您确实必须使用Multiprocessing,则可以与队列共享结果。通过参数传递队列或在作用域外初始化队列。

以你为例…

from multiprocessing import Process,Queue
import os
def func1(arg1,queue):
return queue.put(arg1)
def func2(arg2,queue):
queue.put(arg2)
if __name__ == '__main__':
queue = Queue()
p1 = Process(target=func1, args=([1,2,3],queue))
p2 = Process(target=func2, args=([4,5,6],queue))
p1.start()
p2.start()
p1.join()
p2.join()
queue.put("Done")

while True:
msg = queue.get() 
print(msg)
if msg =="Done":
break

输出:

[4, 5, 6]
[1, 2, 3]
Done

如果你使用的是最新的Python版本,你可以使用asyncio、协程和任务来实现你想要的。https://docs.python.org/3/library/asyncio-task.html

import asyncio
import time
async def func1(arg1):
await asyncio.sleep(2)
return "1 says " + str(arg1)
async def func2(arg1):
await asyncio.sleep(3)
return "2 says " + str(arg1)
async def main():
task1 = asyncio.create_task(func1('hello'))
task2 = asyncio.create_task(func2('world'))
print(f"started at {time.strftime('%X')}")
print('should take 3 secs, not 5.')
done = await asyncio.gather(task1, task2)
print(done)
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())

样例输出。请注意,如果func1和func2是cpu绑定的,那么asyncio将无法如此公平地调度它们,正如另一个答案所提到的。

$ python3 user_12_gather.py
started at 19:29:24
should take 3 secs, not 5.
['1 says hello', '2 says world']
finished at 19:29:27

真正的答案是,它几乎完全取决于func1和func2将要做什么。Python 3中有许多并行选项,每个选项都是为不同的用例设计的。

如果每个函数都在读取一个数据流,并对每个项目做一些相对快速的操作,那么asyncio将非常适合这个用例,因为每个函数将花费大部分时间等待io完成,而asyncio将能够在它们之间切换执行上下文非常好和容易。

然而,如果函数是cpu限制的(例如,你要发送每个函数来计算pi到1000位数字),那么iosync将不能很好地工作,因为它没有机会在它们之间切换。您可以通过添加async.sleep(0)来改进这一点,作为提示,这将是一个很好的切换位置,但基本上一次只能运行一个。

对于cpu受限用例,您需要多个执行上下文来实现并行性。最简单的选择是单个进程中的线程,但是正如您所注意到的,Python中的全局解释器锁定限制了该解决方案的可实现性能。

另一种选择是将每个工作流分支到一个单独的进程中,让内核处理并发性。有很多方法可以实现它,从简单的fork到子进程模块,再到多进程模块。在复杂性和功能性之间进行权衡时,每一种都有自己的优点,但从根本上说,它们都分叉单独的进程,并让内核处理并行性。

最新更新