各位程序员大家好!
我正在尝试在类中实现multiprocessing
,以减少程序的处理时间。
这是该程序的缩写:
import multiprocessing as mp
from functools import partial
class PlanningMachines():
def __init__(self, machines, number_of_objectives, topology=False, episodes=None):
....
def calculate_total_node_THD_func_real_data_with_topo(self):
self.consider_topology = True
func_part = partial(self.worker_function, consider_topology=self.consider_topology,
list_of_machines=self.list_of_machines, next_state=self.next_state, phase=phase, grid_topo=self.grid_topo,
total_THD_for_all_timesteps_with_topo=total_THD_for_all_timesteps_with_topo,
smallest_harmonic=smallest_harmonic, pol2cart=self.pol2cart, cart2pol=self.cart2pol,
total_THD_for_all_timesteps=total_THD_for_all_timesteps, harmonics_state_phase=harmonics_state_phase,
episode=self.episode, episodes=self.episodes, time_=self.time_, steplength=self.steplength,
longest_measurement=longest_measurement)
with mp.Pool() as mpool:
mpool.map(func_part, range(0, longest_measurement))
def worker_function(measurement=None, consider_topology=None, list_of_machines=None, next_state=None, phase=None,
grid_topo=None, total_THD_for_all_timesteps_with_topo=None, smallest_harmonic=None, pol2cart=None,
cart2pol=None, total_THD_for_all_timesteps=None, harmonics_state_phase=None, episode=None,
episodes=None, time_=None, steplength=None, longest_measurement=None):
.....
您可能知道,实现并行处理的一种方法是使用multiprocessing.Pool().map
:
with mp.Pool() as mpool:
mpool.map(func_part, range(0, longest_measurement))
这个函数需要一个可以"打包"functools.partial
的worker_function
:
func_part = partial(self.worker_function, consider_topology=self.consider_topology,
list_of_machines=self.list_of_machines, next_state=self.next_state, phase=phase, grid_topo=self.grid_topo,
total_THD_for_all_timesteps_with_topo=total_THD_for_all_timesteps_with_topo,
smallest_harmonic=smallest_harmonic, pol2cart=self.pol2cart, cart2pol=self.cart2pol,
total_THD_for_all_timesteps=total_THD_for_all_timesteps, harmonics_state_phase=harmonics_state_phase,
episode=self.episode, episodes=self.episodes, time_=self.time_, steplength=self.steplength,
longest_measurement=longest_measurement)
当我尝试执行mpool.map(func_part, range(0, longest_measurement))
时抛出错误:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "C:UsersArturAnacondalibmultiprocessingpool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "C:UsersArturAnacondalibmultiprocessingpool.py", line 44, in mapstar
return list(map(*args))
TypeError: worker_function() got multiple values for argument 'consider_topology'
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:/Users/Artur/Desktop/RL_framework/train.py", line 87, in <module>
main()
File "C:/Users/Artur/Desktop/RL_framework/train.py", line 77, in main
duration = cf.training(episodes, env, agent, filename, topology=topology, multi_processing=multi_processing, CPUs_used=CPUs_used)
File "C:UsersArturDesktopRL_frameworkhelp_functionscustom_functions.py", line 166, in training
save_interval = parallel_training(range(episodes), env, agent, log_data_qvalues, log_data, filename, CPUs_used)
File "C:UsersArturDesktopRL_frameworkhelp_functionscustom_functions.py", line 54, in paral
lel_training
next_state, reward = env.step(action, state) # given the action, the environment gives back the next_state and the reward for the transaction for all objectives seperately
File "C:UsersArturDesktopRL_frameworkhelp_functionsenvironment_machines.py", line 127, in step
self.calculate_total_node_THD_func_real_data_with_topo() # THD_plant calculation with considering grid topo
File "C:UsersArturDesktopRL_frameworkhelp_functionsenvironment_machines.py", line 430, in calculate_total_node_THD_func_real_data_with_topo
mpool.map(func_part, range(longest_measurement))
File "C:UsersArturAnacondalibmultiprocessingpool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "C:UsersArturAnacondalibmultiprocessingpool.py", line 657, in get
raise self._value
TypeError: worker_function() got multiple values for argument 'consider_topology'
Process finished with exit code 1
如果在worker_function
之前传递consider_topology
如何具有多个值:
self.consider_topology = True
我希望我能很好地描述我的问题,以便您理解。谢谢你的回报。
我认为问题是您的worker_function
应该是一个静态方法。
现在发生的情况是,您在部分调用中提供除测量变量之外的所有值。你这样做是因为这是我猜你要改变的一个值。
但是,由于它是一个类方法,它也自动提供自身的实例作为第一个参数。您没有将 self 定义为worker_function
的第一个参数,现在类实例作为您的measurement
输入输入。然后,您提供的映射调用range(0, longest_measurement)
将作为第二个输入变量插入。现在,由于consider_topology
是第二个输入参数,该函数将看到为其提供的两个值,1 是partial
调用中的值,2 是map
调用中的值。