我正在开发一种启发式算法;"好";NP(因此CPU密集型(问题的解决方案。
我正在使用Python实现我的解决方案(我同意当速度是一个问题时,这不是最好的选择,但事实确实如此(,我正在将工作负载分配到许多子流程中,每个子流程负责探索可能的解决方案空间的一个分支。
为了提高性能,我想在所有子流程中共享在执行每个子流程期间收集的一些信息。";明显的";收集这些信息的方法是在字典中收集它们,字典的键是(冻结的(整数集,值是整数列表(或集合(。因此,共享字典对于每个子进程都必须是可读写的,但我可以放心地预期,读取将比写入频繁得多,因为子进程只有在找到"时才会写入共享字典;有趣的";并且会更频繁地阅读dict,以了解某个解决方案是否已经由其他进程进行了评估(以避免对同一分支进行两次或两次以上的探索(。我不希望这样的字典的大小超过10MB。
目前,我使用multiprocessing.Manager()
的实例实现了共享dict,该实例负责开箱即用地处理对共享dict的并发访问。然而(根据我的发现(,这种共享数据的方式是使用进程之间的管道来实现的,这比普通和简单的共享内存慢得多(此外,字典在通过管道发送之前必须经过酸洗,在收到时必须进行解拾取(。
到目前为止,我的代码如下:
# main.py
import multiprocessing as mp
import os
def worker(a, b, c, shared_dict):
while condition:
# do things
# sometimes reads from shared_dict to check if a candidate solution has already been evaluated by other process
# if not, evaluate it and store it inside the shared_dict together with some related info
return worker_result
def main():
with mp.Manager() as manager:
# setup params a, b, c, ...
# ...
shared_dict = manager.dict()
n_processes = os.cpu_count()
with mp.Pool(processes=n_processes) as pool:
async_results = [pool.apply_async(worker, (a, b, c, shared_dict)) for _ in range(n_processes)]
results = [res.get() for res in async_results]
# gather the overall result from 'results' list
if __name__ == '__main__':
main()
为了避免管道带来的开销,我想使用共享内存,但Python标准库似乎没有提供在共享内存中处理字典的直接方法。据我所知,Python标准库只为标准ctypes(使用multiprocessing.Value
和multiprocessing.Array
(提供了在共享内存中存储数据的助手,或者允许您访问共享内存的原始区域。
我不想在共享内存的原始区域中实现我自己的哈希表,因为我既不是哈希表专家,也不是并发编程专家,相反,我想知道是否有其他更快的解决方案可以满足我的需求,而不需要从零开始写所有内容。例如,我看到光线库允许以比使用管道更快的速度读取在共享内存中写入的数据,但是,一旦字典被序列化并写入共享内存区域,似乎就无法修改它。
有什么帮助吗?
你可以和演员一起表演一些小把戏。例如,如果值是不可变的,则可以将对象引用存储在dict中。然后dict本身就不会在共享内存中,但它的所有对象都会在
@ray.remote
class DictActor
def __init__(self):
self._dict = {}
def put(self, key, value):
self._dict[key] = ray.put(value)
def get(self, key):
return self._dict[key]
d = DictActor.remote()
ray.get(d.put.remote("a", np.zeros(100)))
ray.get(d.get.remote("a")) # This result is in shared memory.