我正在开发一个管道,其中包括从机器学习模型中获得预测,我正在尝试使用ray来加快速度。输入可能会重复,所以我想在函数中共享一个缓存,这样这个远程函数的所有工作人员都可以共享对缓存的访问,并对其进行搜索和获取值。类似下面的
@ray.remote
def f(x):
# create inputs from x
# do work
unknown_y1 = []
obtained_y1 = []
for index, y in enumerate(y1):
key = '|'.join([str(x) for x in y.values()])
if key in cached:
obtained_y1.append(cached[key])
else:
obtained_y1.append(np.inf)
unknown_y1.append(promo)
unknown_y2 = []
obtained_y2 = []
for index, y in enumerate(y2):
key = '|'.join([str(x) for x in y.values()])
if key in cached:
obtained_y2.append(cached[key])
else:
obtained_y2.append(np.inf)
unknown_y2.append(baseline)
known_y1, known_y2 = predictor.predict(unknown_y1,unknown_y2)
unknown_index = 0
for index in range(len(y1)):
if(obtained_y1[index] == np.inf):
obtained_y1[index] = known_y1[unknown_index]
key = '|'.join([str(x) for x in y1[index].values()])
if not(key in cached):
cached[key] = obtained_y1[index]
unknown_index = unknown_index+1
unknown_index = 0
for index in range(len(y2)):
if(obtained_y2[index] == np.inf):
obtained_y2[index] = known_y2[unknown_index]
key = '|'.join([str(x) for x in y2[index].values()])
if not(key in cached):
cached[key] = obtained_y2[index]
unknown_index = unknown_index+1
我曾尝试通过在脚本顶部添加global cached;cached=dict()
来创建一个全局字典,但该变量似乎是不同的工作程序版本,并且不共享数据。以前我使用dogpile.cache.redis
进行此操作,但该区域将不可序列化,因为它使用线程锁。我也尝试过创建一个dict,并使用ray.put(cached)
将其放入ray的对象存储中,但我想我在某个地方读到ray无法在内存中共享字典
我目前正在尝试从每个工作线程返回缓存,并将它们合并到main中,然后再次将它们放入对象存储中。是否有更好的方法在射线工作者之间共享字典/缓存?
您可能对这个关于为Ray编写函数缓存的问题/答案感兴趣。Ray actor函数的缓存实现
您的想法是正确的,但我认为您缺少的关键细节是,您应该使用Ray在actor或对象存储中保持全局状态(如果它是不可变的(。
在您的情况下,看起来您正在尝试缓存远程功能的一部分,而不是整个功能。你可能想要这样的东西。
以下是您可以考虑如何编写函数的简化版本。
@ray.remote
class Cache:
def __init__(self):
self.cache = {}
def put(self, x, y):
self.cache[x] = y
def get(self, x):
return self.cache.get(x)
global_cache = Cache.remote()
@ray.remote
def f(x):
all_inputs = list(range(x)) # A simplified set of generated inputs based on x
obtained_output = ray.get([global_cache.get(i) for i in all_inputs])
unknown_indices = []
for i, output in enumerate(obtained_output):
if output is None:
unknown_inputs.append(i)
# Now go through and calculate all the unknown inputs
for i in unknown_inputs:
output = predict(all_inputs[i]) # calculate the output
global_cache.put.remote(output) # Cache it so it's available next time
obtained_output[i] = output
return obtained_output
不幸的是,您没有创建一个最小的、可复制的示例,所以我看不出您是如何进行多处理的。为了便于论证,我假设您使用的是multiprocessing
模块中的Pool
类(concurrent.futures.ProcessPoolExecutor
作为类似的工具(。然后,您想使用管理的、可共享的dict
,如下所示:
from multiprocessing import Pool, Manager
def init_pool(the_cache):
# initialize each process in the pool with the following global variable:
global cached
cached = the_cache
def main():
with Manager() as manager:
cached = manager.dict()
with Pool(initializer=init_pool, initargs=(cached,)) as pool:
... # code that creates tasks
# required by Windows:
if __name__ == '__main__':
main()
这将在带有变量cached
的字典中创建对此字典的代理的引用。因此,所有字典访问本质上都变得更类似于远程过程调用,因此执行速度比"远程过程调用"慢得多;正常的";字典访问。只要注意。。。
如果有其他机制用于创建工作者(decorator@ray.remote
?(,则可以将cached
变量作为参数传递给函数f