我正在进行一个研究项目,希望应用并行化来提高执行速度。我以前使用过multiprocessing
库,但只用于数字运算。我将试着简要描述一下我的设定和目标。我主要希望有一个想法,来自对多处理概念更有经验的人。
项目:
该项目是一个多echolon供应链模拟(多级分销网络),根据传入的需求,在每个地点定期做出重新订购决定。玩具示例如下:
Level 3 Level 2 Level 1 Level 0
--- Local Warehouse 1
|
--- Central Warehouse 1 --
| |
| --- Local Warehouse 2
|
Supplier -- Customer
| --- Local Warehouse 3
| |
--- Central Warehouse 2 --
|
--- Local Warehouse 4
模拟对象(简化)如下:
class Simulation:
self.locations = dict() #List of locations
self.customer = Customer() #Object periodically ordering at deepest level (Local) Warehouses
self.levels = {0: [], 1:[],..} # Locations by depth in network graph
def run(self):
for period in simulation_length:
for level in self.levels:
for location in level:
#review orders and issue order if required
class Location:
self.orders = [] #list of received orders
def review(self):
#Decides based on received orders if reorder required
def order(self, order, other_location):
simulation.locations[other_location].orders.append(order)
因此程序如下:
- 客户(0级)向本地仓库(1级)发出订单
- 本地仓库(1级)审核订单并向中央仓库(2级)发出订单
- 以此类推,直到供应商
- 下一个期间
我的问题/想法
目前,我有一个属于供应链特定级别的所有仓库的dict
,并且我在每个阶段按顺序遍历每个级别中的每个仓库(因此满足依赖关系)。级别的数量是有限的,但每个级别的仓库数量很大,而且审查逻辑可能是计算密集型的,因此我的计划是并行审查属于同一级别的所有仓库。
但是,由于位置使用访问模拟对象中另一个对象的属性的函数order(self, order, other_location)
,因此我需要在进程之间共享整个模拟对象。
想法和方法:
- 每当下订单时,将
sumulation object
放在shared memory
中,并在对象上使用Lock
(评审中的所有其他操作都是纯读取操作) - 与其直接下订单,不如将它们放在
Queue
中到主流程,并在一级退货内的所有仓库之后,只执行订单函数(计算成本低廉)
(1)的问题:
根据我的研究,只有CType
对象Value
和Array
可以放在共享内存中。我不知道怎么回事。我唯一读到的是multiprocessing Manager
,但林克提出的另一个stackerflow问题是,它不适用于嵌套对象。
(2)的问题:
由于每个仓库对象在不同时期之间发生变化(订单到达、库存变化等),我必须在每个时期将仓库对象移交给流程,以便它是最新的,这将产生很大的开销(至少我认为是这样)
结论
我希望清楚我想要实现什么。任何暗示、澄清或纠正我的误解都将是很棒的!
编辑@Roy12:的回答
谢谢你的回答。我肯定会看看Dask,因为它的最终目标是利用集群。关于第一个提示,我想到了两个实现,非常感谢您的建议:我的位置需要接收和发送订单对象,发送部分由对象本身控制,而接收则不是。因此,对我来说,选项1是
在周期开始时,使用最新位置对象的派生进程进行计算,而不是直接发送订单,而是将它们放入队列并关闭进程。当完成整个级别时,主流程会分配订单并生成下一级别的流程,依此类推。这导致了有规律的产卵和关闭过程,并且根据模拟长度,定位对象会变得相当大
我在开始时静态地将位置映射到流程,并有一个传入队列和一个传出队列,让主流程分发订单。例如,流程1(位置1)向流程2(位置2)发送订单将是->流程1->主流程->流程2。在这种情况下,每次处理订单并执行例行程序(读取队列->重新计算->将订单发送到队列)时,都需要向流程提供一个信号
(2)对我来说似乎更复杂,但我对缺点没有感觉,除了收集最终必须编程。如果重要的话,订单对象的大小约为40字节,那么在整个运行过程中,位置对象(仓库)将增长到约15mb
一个不错的用例。一些想法/建议:
- 不要使用共享内存。这几天被认为是不好的做法。在过去,人们常常使用共享内存进行并发,但现代的方法是尽可能避免这种情况。例如,Go语言提供了一些不错的替代方案(请参阅https://blog.golang.org/codelab-share)。共享内存的另一个缺点是你不能在多台机器上分配你的工作
- 使用队列通常要好得多。如果您要向后移动的数据以及进程之间的数据不是很大——很多(很多)兆字节——那么开销将可以忽略不计
- 对于您的用例,您可能需要考虑使用分布式计算框架,如Dask。它提供了从子任务中收集结果的简单方法,然后才开始在层次结构中的下一级工作。此外,它将允许您将工作分布在整个集群中,而不仅仅是一台机器
希望这能有所帮助。
更新以下一些规模数据:
问题指出,一个位置的大小为15MB,订单的大小约为40字节(明显更小)。
考虑到这一点,很明显,如果我们针对低网络流量进行优化,我们将选择#1模型,在该模型中,每个位置都是一个在整个模拟过程中存在的过程,并与其他位置通信查看队列和消息。
但是——这是一个很大的问题——通过队列运行所有通信似乎是一个更复杂的实现。创建一个包含15MB数据的进程需要不到一秒钟的时间。如果每个位置的计算都是非平凡的,那么它可能需要比流程创建本身多得多的时间。出于这个原因,我可能会从更简单的实现开始(为每个位置生成一个新流程)。
换句话说,围绕队列构建整个系统似乎有些过早的优化。
最后要注意的是:有一个名为SimPy的Python模拟包。我不知道它的可扩展性有多大,但它可能值得一看。