多处理过程中间输出



我有一个加载数据并循环时间的函数,例如

def calculate_profit(account):
account_data = load(account) #very expensive operation
for day in account_data.days:
print(account_data.get(day).profit)

因为数据的加载成本很高,所以使用 joblib/multiprocessing 来做这样的事情是有意义的:

arr = [account1, account2, account3, ...]
joblib.Parallel(n_jobs=-1)(delayed(calculate_profit)(arr))

但是,我还有另一个昂贵的函数,我想将其应用于calculate_profit函数的中间结果。例如,假设汇总所有利润并将其处理/发布到网站/等是一项昂贵的操作。此外,我需要前一天的利润来计算此函数中的利润变化。

def expensive_sum(prev_day_profits, *account_profits):
total_profit_today = sum(account_profits)
profit_difference = total_profit_today - prev_day_profits
#some other expensive operation
#more expensive operations

所以我想

  1. 并行运行多处理进程(以减少加载所有昂贵帐户数据的负载(
  2. 一旦每个多处理过程到达预定义的点(例如,完成循环的一次迭代(,将这些中间值返回到另一个函数(expensive_sum(进行处理 -假设每个单独的多处理过程不能继续,直到expensive_sum返回
  3. 但是,我想保持多处理进程处于活动状态,这样我就不必重新初始化它们(减少开销(

有什么办法可以做到这一点吗?

from multiprocessing import Manager
queue = manager.Queue()

一旦每个多处理过程到达预定义的点 做

queue.put(item)

同时,另一个昂贵的功能可以

queue.get(item)  ==>  blocking call for get

昂贵的函数等待get,并在获得值时继续处理它并再次等待get

相关内容

  • 没有找到相关文章

最新更新