我有一个相当复杂的递归函数,其中包含许多参数(如果有人想知道的话,Obara-Saika-Scheme),我想更有效地进行演示。 作为第一步,我应用了@functools.lru_cache
.作为第二步,我现在想使用multiprocessing.Pool
异步计算一长串输入参数。
改编 functools Python 文档中的第二个示例并添加我拥有的工人池:
from multiprocessing import Pool
from functools import lru_cache
@lru_cache(maxsize=10)
def fibonacci(n):
print('calculating fibonacci(%i)' %n)
if n < 2:
return n
return fibonacci(n-1)+fibonacci(n-2)
with Pool(processes=4) as pool:
for i in range(10):
res = pool.apply_async(fibonacci, (i,))
print(res.get())
print(fibonacci.cache_info())
问题1
如何让缓存在不同的工作线程之间共享。另一个问题(如何共享缓存?)问类似的事情,但我无法让它工作。这是我对此的 2 种失败方法。
使用multiprocessing.Pool
:
from multiprocessing import Pool
from functools import lru_cache
import time
@lru_cache(maxsize=10)
def fibonacci(n):
print('calculating fibonacci(%i)' %n) # log whether the function gets called
if n < 2:
return n
return fibonacci(n-1)+fibonacci(n-2)
res = []
with Pool(processes=4) as pool:
# submit first task
res.append(pool.apply_async(fibonacci, (5,)).get())
# give fibonacci() some time to fill its cache
time.sleep(1)
# submit second task
res.append(pool.apply_async(fibonacci, (3,)).get())
print(res)
使用concurrent.futures
:
import concurrent.futures
from functools import lru_cache
import time
@lru_cache(maxsize=10)
def fibonacci(n):
print('calculating fibonacci(%i)' %n) # log whether the function gets called
if n < 2:
return n
return fibonacci(n-1)+fibonacci(n-2)
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
@lru_cache(maxsize=10)
def fib_async(n):
print('calculating fib_async(%i)' %n)
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
res = []
# submit first task
res.append(executor.submit(fib_async, 5))
# give fib_async() some time to fill its cache
time.sleep(1)
# submit second task
res.append(executor.submit(fib_async, 3))
res = [e.result() for e in res]
print(res)
两者都产生基本相同的输出,表明第二个任务重新计算fibonacci(2)
,尽管第一个任务已经必须计算它。如何共享缓存?
这应该会加快速度,但如果重复调用的时机不当,仍然存在问题:当前由 worker1 评估的调用尚未缓存,worker2 可能会开始评估相同的内容。这让我想到:
问题2
计算斐波那契数的递归是相当线性的,即只有一个参数被递减。我的函数更复杂,我可以使用一些东西来管理已经计算的输入参数,还可以跟踪当前正在计算的内容。
需要明确的是:我想对递归函数进行许多并行调用,这将产生许多对递归函数的新调用。
一个棘手的事情可能是避免将一个调用直接分配给工作线程,因为当递归深度超过工作线程数量时,这会导致死锁。
我已经可以使用这样的东西了吗?还是我需要自己构建一些东西?我偶然发现了multiprocessing.managers
和concurrent.futures.ProcessPoolExecutor
可能会有所帮助。但我可以使用一些帮助来开始。
由于所需的功能受 CPU 限制,因此选择multiprocessing
来完成此任务是正确的。
该函数@lru_cache
使用内存中的缓存。每个 python 进程都包含自己的内存块,因此您将生成 2 个独立的缓存(它们位于不同的内存空间上)。
如果要同步这些缓存,则需要使用某种内存同步机制,例如锁等。默认的lru_cache
方法不会进行多重处理,但您可以非常轻松地自己实现一个。
只需使用共享字典(这里是一个很好的例子)来保存缓存的项目,并使用锁包装对该字典的访问(参考这里是 python wiki 页面)。这样,您可以在保持访问安全的同时跨进程共享字典。