我有一个加载数据并循环时间的函数,例如
def calculate_profit(account):
account_data = load(account) #very expensive operation
for day in account_data.days:
print(account_data.get(day).profit)
因为数据的加载成本很高,所以使用 joblib/multiprocessing 来做这样的事情是有意义的:
arr = [account1, account2, account3, ...]
joblib.Parallel(n_jobs=-1)(delayed(calculate_profit)(arr))
但是,我还有另一个昂贵的函数,我想将其应用于calculate_profit
函数的中间结果。例如,假设汇总所有利润并将其处理/发布到网站/等是一项昂贵的操作。此外,我需要前一天的利润来计算此函数中的利润变化。
def expensive_sum(prev_day_profits, *account_profits):
total_profit_today = sum(account_profits)
profit_difference = total_profit_today - prev_day_profits
#some other expensive operation
#more expensive operations
所以我想
- 并行运行多处理进程(以减少加载所有昂贵帐户数据的负载(
- 一旦每个多处理过程到达预定义的点(例如,完成循环的一次迭代(,将这些中间值返回到另一个函数(
expensive_sum
(进行处理 -假设每个单独的多处理过程不能继续,直到expensive_sum
返回 - 但是,我想保持多处理进程处于活动状态,这样我就不必重新初始化它们(减少开销(
有什么办法可以做到这一点吗?
from multiprocessing import Manager
queue = manager.Queue()
一旦每个多处理过程到达预定义的点 做
queue.put(item)
同时,另一个昂贵的功能可以
queue.get(item) ==> blocking call for get
昂贵的函数等待get
,并在获得值时继续处理它并再次等待get