如何并行迭代 SimPy 模拟?



>我有一个 SimPy 模型,它返回一个随机结果,我想多次复制。每个复制都是独立的,所以为了更快,我想并行运行它们。我尝试过Python的多处理,Pathos多处理和joblib Parallel,但是每种方法我都得到相同的错误:TypeError: can't pickle generator objects。有没有办法避免此错误并并行运行模拟?

SimPy 依赖于生成器,如此处所述,因此无法避免它们。

该错误很好地描述了问题。在你发送到子进程的对象中的某个地方,一个生成器潜伏着,大概在函数参数中。是否可以将此生成器转换为列表?

例如,以下内容会引发您提到的错误:

from multiprocessing import Pool
def firstn(n):
k = 0
while k < n:
yield k
k += 1
if __name__ == "__main__":
p = Pool(2)
print(p.map(firstn, [1, 2, 3, 4]))

但是这个有效:

from multiprocessing import Pool
def firstn(n):
k = 0
while k < n:
yield k
k += 1
def wrapped(n):
return list(firstn(n))
if __name__ == "__main__":
p = Pool(2)
print(p.map(wrapped, [1, 2, 3, 4]))

您需要在新进程中从头开始实例化环境,并注意仅使用原版类型作为要在Pool中映射的参数。这是一个重新设计的洗车示例(来自simpy文档的示例(,它使用不同的种子运行 4 个并行模拟,并打印每种情况下洗了多少辆汽车。

import multiprocessing as mp
import simpy
import random

NUM_MACHINES = 2  # Number of machines in the carwash
WASHTIME = 5      # Minutes it takes to clean a car
T_INTER = 7       # Create a car every ~7 minutes
SIM_TIME = 20     # Simulation time in minutes

class Carwash(object):
"""A carwash has a limited number of machines (``NUM_MACHINES``) to
clean cars in parallel.
Cars have to request one of the machines. When they got one, they
can start the washing processes and wait for it to finish (which
takes ``washtime`` minutes).
"""
def __init__(self, env, num_machines, washtime):
self.env = env
self.machine = simpy.Resource(env, num_machines)
self.washtime = washtime
def wash(self, car):
"""The washing processes. It takes a ``car`` processes and tries
to clean it."""
yield self.env.timeout(WASHTIME)

def car(env, name, cw):
"""The car process (each car has a ``name``) arrives at the carwash
(``cw``) and requests a cleaning machine.
It then starts the washing process, waits for it to finish and
leaves to never come back ...
"""
with cw.machine.request() as request:
yield request
yield env.process(cw.wash(name))

def setup(env, num_machines, washtime, t_inter):
"""Create a carwash, a number of initial cars and keep creating cars
approx. every ``t_inter`` minutes."""
# Create the carwash
carwash = Carwash(env, num_machines, washtime)
# Create 4 initial cars
for i in range(4):
env.process(car(env, 'Car %d' % i, carwash))
# Create more cars while the simulation is running
while True:
yield env.timeout(random.randint(t_inter - 5, t_inter + 5))
i += 1
env.i = i
env.process(car(env, 'Car %d' % i, carwash))

# additional wrapping function to be executed by the pool
def do_simulation_with_seed(rs):
random.seed(rs)  # This influences only the specific process being run
env = simpy.Environment()  # THE ENVIRONMENT IS CREATED HERE, IN THE CHILD PROCESS
env.process(setup(env, NUM_MACHINES, WASHTIME, T_INTER))
env.run(until=SIM_TIME)
return env.i

if __name__ == '__main__':
seeds = range(4)
carwash_pool = mp.Pool(4)
ncars_by_seed = carwash_pool.map(do_simulation_with_seed, seeds)
for s, ncars in zip(seeds, ncars_by_seed):
print('seed={} --> {} cars washed'.format(s, ncars))

最新更新