集群重启后如何重新连接到ray cluster ?



我有一个关于射线集群和FastAPI服务器之间的重连接过程的问题。在FastAPI中,在启动事件中init/connect到ray集群:

@app.on_event("startup")
async def init_ray():
...
ray.init(address=f'{ray_head_host}:{ray_head_port}', _redis_password=ray_redis_password, namespace=ray_serve_namespace)
...

在重新启动ray集群的情况下,当我想在一些FastAPI路由中使用ray API时,我遇到了一个问题:

Exception: Ray Client is not connected. Please connect by calling `ray.connect`.

所以看起来FastAPI到ray的连接丢失了(这也被ray.is_initilized()==>False)。但是,如果我尝试使用ray.init()重新连接,我得到以下错误:

Exception: ray.connect() called, but ray client is already connected

我也尝试调用ray.shutdown()通知重新初始化调用,但没有成功。

也许有人知道如何从FastAPI重新连接?

您可以创建一个守护线程来持续检查射线连接。如果射线客户端断开连接,通过调用启动函数init_ray()

重新连接
import threading
from ray.util.client import ray as ray_stub
class RayConn(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.start()
def run(self):
while True:
# sleep for 30 seconds
time.sleep(30)
if not ray_stub.is_connected():
logger.error("Ray client is disconnected. Trying to reconnect")
try:
try:
ray.shutdown()
logger.info("Shutdown complete.")
except BaseException as e:
logger.error(f"Failed to shutdown: {e}")
reestablish_conn() # your function that call ray.init() and task creation, if any
logger.info(f"Successfully reconnected, reconnect count: {reconnect_count}")
except BaseException as ee:
logger.error(f"Failed to to connect to ray head! {ee}")

RayConn()

我最终使用上下文管理器来管理到ray的连接。

class RayConnection:
def __init__(self, address, **kwargs):
ray.init(address=address, **kwargs)
def __enter__(self):
return self
def __exit__(self, typ, value, traceback):
ray.shutdown()

然后你可以把你的ray调用包在里面,让它总是正确地关闭和重新打开。

with RayConnection():
print(ray.available_resources())

最新更新