如何从 ray 中的对象存储中清除对象?



我正在尝试有前途的多处理包ray。我有一个似乎无法解决的问题。我的程序第一次运行良好,但在第二次运行时,ray.put()行上会引发此异常:

ObjectStoreFullError: Failed to put object ffffffffffffffffffffffffffffffffffffffff010000000c000000 in object store because it is full. Object size is 2151680255 bytes.
The local object store is full of objects that are still in scope and cannot be evicted. Tip: Use the `ray memory` command to list active objects in the cluster.

我想做什么:在我的实际代码(我打算编写)中,我需要按顺序处理许多big_data_objects。我想一次在内存中保存一个big_data_object,并对大数据对象进行几次繁重(独立)的计算。我想并行执行这些计算。完成这些操作后,我必须将对象存储中的这些big_data_object替换为新,然后再次(并行)开始计算。

使用我的测试脚本,我通过再次启动脚本来模拟这一点,而不会ray.shutdown().如果我使用ray.shutdown()关闭ray,则会清除对象存储,但重新初始化需要很长时间,并且我无法根据需要按顺序处理多个big_data_object

我研究了哪些信息来源:我研究了这篇文档 Ray 设计模式,并研究了"反模式:大型/不可序列化对象的闭包捕获"部分以及如何正确模式应该是什么样子。我还研究了入门指南,该指南导致了以下测试脚本。

重现问题的最小示例:我创建了一个测试脚本来测试它。它是这样的:

#%% Imports
import ray
import time
import psutil
import numpy as np

#%% Testing ray
# Start Ray
num_cpus = psutil.cpu_count(logical=False)
if not ray.is_initialized():
ray.init(num_cpus=num_cpus, include_dashboard=False)
# Define function to do work in parallel
@ray.remote
def my_function(x):  # Later I will have multiple (different) my_functions to extract different feature from my big_data_object
time.sleep(1)
data_item = ray.get(big_data_object_ref)
return data_item[0,0]+x
# Define large data
big_data_object = np.random.rand(16400,16400)  # Define an object of approx 2 GB. Works on my machine (16 GB RAM)
# big_data_object = np.random.rand(11600,11600)  # Define an object of approx 1 GB.
# big_data_object = np.random.rand(8100,8100)  # Define an object of approx 500 MB.
# big_data_object = np.random.rand(5000,5000)  # Define an object of approx 190 MB.
big_data_object_ref = ray.put(big_data_object)
# Start 4 tasks in parallel.
result_refs = []
# for item in data:
for item in range(4):
result_refs.append(my_function.remote(item))

# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_refs)
print("Results: {}".format(results))

#%% Clean-up object store data - Still their is a (huge) memory leak in the object store.
for index in range(4):
del result_refs[0]
del big_data_object_ref

我认为哪里出了问题:我想我删除了脚本末尾对对象存储的所有引用。因此,应从对象存储中清除对象(如此处所述)。显然,出了点问题,因为big_data_object保留在对象存储中。但是,results可以从对象存储中删除即可。

一些调试信息:我使用ray memory命令检查了对象存储,这是我得到的:

(c:pythoncenv38rl) PS C:WINDOWSsystem32> ray memory
---------------------------------------------------------------------------------------------------------------------
Object ID                                                Reference Type       Object Size   Reference Creation Site
=====================================================================================================================
; worker pid=20952
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:pythoncenv38rllibsite-packagesrayserialization.py:object_ref_deserializer:45 | c:pythoncenv38rllibsite-packagesrayfunction_manager.py:fetch_and_register_remote_function:180 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_process_key:140 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_run:87
; worker pid=29368
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:pythoncenv38rllibsite-packagesrayserialization.py:object_ref_deserializer:45 | c:pythoncenv38rllibsite-packagesrayfunction_manager.py:fetch_and_register_remote_function:180 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_process_key:140 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_run:87
; worker pid=17388
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:pythoncenv38rllibsite-packagesrayserialization.py:object_ref_deserializer:45 | c:pythoncenv38rllibsite-packagesrayfunction_manager.py:fetch_and_register_remote_function:180 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_process_key:140 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_run:87
; worker pid=24208
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:pythoncenv38rllibsite-packagesrayserialization.py:object_ref_deserializer:45 | c:pythoncenv38rllibsite-packagesrayfunction_manager.py:fetch_and_register_remote_function:180 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_process_key:140 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_run:87
; worker pid=27684
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:pythoncenv38rllibsite-packagesrayserialization.py:object_ref_deserializer:45 | c:pythoncenv38rllibsite-packagesrayfunction_manager.py:fetch_and_register_remote_function:180 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_process_key:140 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_run:87
; worker pid=6860
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:pythoncenv38rllibsite-packagesrayserialization.py:object_ref_deserializer:45 | c:pythoncenv38rllibsite-packagesrayfunction_manager.py:fetch_and_register_remote_function:180 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_process_key:140 | c:pythoncenv38rllibsite-packagesrayimport_thread.py:_run:87
; driver pid=28684
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:pythoncenv38rllibsite-packagesrayworker.py:put_object:277 | c:pythoncenv38rllibsite-packagesrayworker.py:put:1489 | c:pythoncenv38rllibsite-packagesray_privateclient_mode_hook.py:wrapper:47 | C:UsersStefanDocumentsPython examplesMultiprocess_Ray3_SO.py:<module>:42
---------------------------------------------------------------------------------------------------------------------
--- Aggregate object store stats across all nodes ---
Plasma memory usage 2052 MiB, 1 objects, 77.41% full

尝试过的一些事情:如果,我用my_function替换:

@ray.remote
def my_function(x):  # Later I will have multiple different my_functions to extract separate feature from my big_data_objects
time.sleep(1)
# data_item = ray.get(big_data_object_ref)
# return data_item[0,0]+x
return 5

然后脚本成功清除对象存储,但my_function无法使用我需要的big_data_object

我的问题是:如何修复我的代码,以便在脚本结束时从对象存储中删除big_data_object而不关闭 ray?

注意:我使用pip install ray安装了ray,这给了我现在正在使用的版本ray==1.2.0。我在Windows上使用射线,并在conda(实际上是miniconda)环境中Spyder v4.2.5开发,以防相关。

编辑:我也在具有8GB RAM的Ubuntu机器上进行了测试。为此,我使用了 1GB 的big_data_object。 我可以确认问题也发生在这台机器上。

ray memory输出:

(SO_ray) stefan@stefan-HP-ZBook-15:~/Documents/Ray_test_scripts$ ray memory 
---------------------------------------------------------------------------------------------------------------------
Object ID                                                Reference Type       Object Size   Reference Creation Site
=====================================================================================================================
; worker pid=18593
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; worker pid=18591
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; worker pid=18590
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; driver pid=17712
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   (put object)  | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/_private/client_mode_hook.py:wrapper:47 | /home/stefan/Documents/Ray_test_scripts/Multiprocess_Ray3_SO.py:<module>:43 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/spyder_kernels/customize/spydercustomize.py:exec_code:453
; worker pid=18592
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
---------------------------------------------------------------------------------------------------------------------
--- Aggregate object store stats across all nodes ---
Plasma memory usage 1026 MiB, 1 objects, 99.69% full

我必须在Spyder中运行该程序,以便在执行程序后,我可以使用ray memory检查对象存储的内存。例如,如果我在PyCharm中运行该程序,则ray会在脚本完成时自动终止,因此我无法检查脚本是否按预期清除对象存储。

问题是您的远程函数捕获了big_data_object_ref,并且从那里的引用永远不会被删除。请注意,当您执行此类操作时:

# Define function to do work in parallel
@ray.remote
def my_function(x):  # Later I will have multiple (different) my_functions to extract different feature from my big_data_object
time.sleep(1)
data_item = ray.get(big_data_object_ref)
return data_item[0,0]+x
# Define large data
big_data_object = np.random.rand(16400,16400)
big_data_object_ref = ray.put(big_data_object)

big_data_object_ref被序列化为远程函数定义。因此,在删除此序列化函数定义(位于ray内部)之前,有一个永久指针。

请改用这种类型的模式:

#%% Imports
import ray
import time
import psutil
import numpy as np

#%% Testing ray
# Start Ray
num_cpus = psutil.cpu_count(logical=False)
if not ray.is_initialized():
ray.init(num_cpus=num_cpus, include_dashboard=False)
# Define function to do work in parallel
@ray.remote
def my_function(big_data_object, x):
time.sleep(1)
return big_data_object[0,0]+x
# Define large data
#big_data_object = np.random.rand(16400,16400)  # Define an object of approx 2 GB. Works on my machine (16 GB RAM)
# big_data_object = np.random.rand(11600,11600)  # Define an object of approx 1 GB.
big_data_object = np.random.rand(8100,8100)  # Define an object of approx 500 MB.
# big_data_object = np.random.rand(5000,5000)  # Define an object of approx 190 MB.
big_data_object_ref = ray.put(big_data_object)
print("ref in a driver ", big_data_object_ref)
# Start 4 tasks in parallel.
result_refs = []
# for item in data:
for item in range(4):
result_refs.append(my_function.remote(big_data_object_ref, item))

# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_refs)
print("Results: {}".format(results))
print(result_refs)
#%% Clean-up object store data - Still their is a (huge) memory leak in the object store.
#for index in range(4):
#    del result_refs[0]
del result_refs
del big_data_object_ref
import time
time.sleep(1000)

不同之处在于,现在我们将big_data_object_ref作为参数传递给远程函数,而不是在远程函数中捕获它。

注:将对象引用传递给远程函数时,它们会自动取消引用。因此无需在远程功能中使用ray.get()。如果要在远程函数中显式调用ray.get(),请将列表或字典中的对象引用作为参数传递给远程函数。在这种情况下,你会得到类似的东西:

# Remote function
@ray.remote
def my_function(big_data_object_ref_list, x):
time.sleep(1)
big_data_object = ray.get(big_data_object_ref_list[0])
return big_data_object[0,0]+x
# Calling the remote function
my_function.remote([big_data_object_ref], item)

注2:您使用Spyder使用IPython控制台。目前,rayIPython控制台之间存在一些已知问题。只需确保删除脚本中的引用,而不是使用直接输入到IPython控制台的命令(因为这样引用将被删除,但项目不会从对象存储中删除)。要在脚本运行时使用ray memory命令检查对象存储,可以在脚本末尾使用一些代码,例如:

#%% Testing ray
# ... my ray testing code
#%% Clean-up object store data
print("Wait 10 sec BEFORE deletion")
time.sleep(10)  # Now quickly use the 'ray memory' command to inspect the contents of the object store.
del result_refs
del big_data_object_ref
print("Wait 10 sec AFTER deletion")
time.sleep(10)  # Now again use the 'ray memory' command to inspect the contents of the object store.

最新更新