使用Python更新并行计算的参数



我有一些Python代码可以执行下面列出的操作。calc_result()函数根据输入参数生成结果。在每一步,这些输入参数都被更新以计算一组新的结果。重复此过程直到最后一步。下面给出了该过程的示例代码。

  1. 定义初始参数
  2. 根据初始参数计算结果
  3. 根据结果更新参数
  4. 根据更新的参数计算结果
  5. 重复3和4,直到最后一步

工作示例

import numpy as np
import random
import time
def calc_params(res: list) -> list:
time.sleep(random.random())
return [r * 1.1 for r in res]
def calc_result(param: float):
time.sleep(random.random())
return param + 1
def main():
tic = time.perf_counter()
nsteps = 10
nmodels = 4
init_params = [5, 4.5, 8, 2]
steps = list(range(nsteps))
params = np.zeros((nsteps, nmodels))
params[0] = init_params
results = np.zeros((nsteps, nmodels))
for step in steps:
step_results = []
for p in params[step]:
out = calc_result(p)
step_results.append(out)
results[step] = step_results
if step < nsteps - 1:
params[step + 1] = calc_params(step_results)
toc = time.perf_counter()
print(f'nElapsed time {toc - tic:.2f} sn')
print(f'Parametersn{params}n')
print(f'Resultsn{results}')
if __name__ == '__main__':
np.set_printoptions(precision=2)
main()

此示例打印以下输出:

Elapsed time 22.75 s
Parameters
[[ 5.    4.5   8.    2.  ]
[ 6.6   6.05  9.9   3.3 ]
[ 8.36  7.76 11.99  4.73]
[10.3   9.63 14.29  6.3 ]
[12.43 11.69 16.82  8.03]
[14.77 13.96 19.6   9.94]
[17.34 16.46 22.66 12.03]
[20.18 19.21 26.03 14.33]
[23.3  22.23 29.73 16.87]
[26.73 25.55 33.8  19.65]]
Results
[[ 6.    5.5   9.    3.  ]
[ 7.6   7.05 10.9   4.3 ]
[ 9.36  8.76 12.99  5.73]
[11.3  10.63 15.29  7.3 ]
[13.43 12.69 17.82  9.03]
[15.77 14.96 20.6  10.94]
[18.34 17.46 23.66 13.03]
[21.18 20.21 27.03 15.33]
[24.3  23.23 30.73 17.87]
[27.73 26.55 34.8  20.65]]

Dask示例

我尝试使用Dask对代码进行并行化,如下所示。

import numpy as np
import random
import time
from dask.distributed import Client, get_client, secede, rejoin
def calc_params(res: list) -> list:
time.sleep(random.random())
return [r * 1.1 for r in res]
def calc_result(param: float):
time.sleep(random.random())
return param + 1
def solve_step(step: int, nsteps: int, params: np.ndarray) -> list:
client = get_client()
futures = client.map(calc_result, params[step], priority=10)
secede()
step_results = client.gather(futures)
rejoin()
if step < nsteps - 1:
params[step + 1] = calc_params(step_results)
return step_results
def main():
tic = time.perf_counter()
nsteps = 10
nmodels = 4
init_params = [5, 4.5, 8, 2]
params = np.zeros((nsteps, nmodels))
params[0] = init_params
steps = list(range(nsteps))
futures = client.map(solve_step, steps, pure=False, nsteps=nsteps, params=params)
results = client.gather(futures)
results = np.array(results)
toc = time.perf_counter()
print(f'nElapsed time {toc - tic:.2f} sn')
print(f'Parametersn{params}n')
print(f'Resultsn{results}')
if __name__ == '__main__':
np.set_printoptions(precision=2)
client = Client(n_workers=8)
print('n' + client.dashboard_link)
main()
client.close()

Dask示例的输出:

Elapsed time 1.85 s
Parameters
[[5.  4.5 8.  2. ]
[0.  0.  0.  0. ]
[0.  0.  0.  0. ]
[0.  0.  0.  0. ]
[0.  0.  0.  0. ]
[0.  0.  0.  0. ]
[0.  0.  0.  0. ]
[0.  0.  0.  0. ]
[0.  0.  0.  0. ]
[0.  0.  0.  0. ]]
Results
[[6.  5.5 9.  3. ]
[1.  1.  1.  1. ]
[1.  1.  1.  1. ]
[1.  1.  1.  1. ]
[1.  1.  1.  1. ]
[1.  1.  1.  1. ]
[1.  1.  1.  1. ]
[1.  1.  1.  1. ]
[1.  1.  1.  1. ]
[1.  1.  1.  1. ]]

Dask示例没有更新参数值,因此没有正确计算结果。有没有一种方法可以在并行计算结果的同时更新参数数组?我试图遵循Dask示例中的方法来处理进化工作流,但它似乎不适用于这个问题。Dask也有Actors,但根据文档,这是一个实验性功能,Dask面板在使用Actors时不显示信息。另一个用于扩展Python的包是Ray,但我没有使用它的经验

根据您的问题描述,迭代过程似乎对算法很重要。所以我不知道你应该如何将其并行化。Dask无法神奇地知道下一步的结果是什么;不断发展的工作流程";在该示例中,指的是触发更多作业的工作流,这与您的情况不同,在您的情况下,每个工作流阶段都取决于前一个阶段。

考虑一个非常简单的工作流程示例:

result = 0
for i in range(10):
result += 1

您可以将其重构为:

def inc(val):
return val + 1
result = 0
for i in range(10):
result = inc(val)

但你不能做的是

# results in a list of 10 futures, each of which results in the number 2
result = client.map(inc, val=result)

最新更新