我有一个服务,它公开了一个API,然后输入任务,它是用Falcon(API(和Celery(任务管理(实现的。
具体来说,我的工作需要很长时间才能加载,他们的代码看起来像这样
class HeavyOp(celery.Task):
def __init__(self):
self._asset = get_heavy_asset() # <-- takes long time
@property
def asset(self):
return self._asset
@app.task(base=HeavyOp)
def my_task(data):
return my_task.asset.do_something(data)
实际发生的是,在__init__
函数中,某个对象从磁盘读取并保留在内存中,只要工作线程存在。
有时,我想更新该对象。
有没有办法在不停机的情况下重新加载工人?由于这一切都在 API 后面,我不希望将加载重物的几分钟作为停机时间。
我们可以假设主机具有 1 个以上的内核,但解决方案必须是单个主机解决方案。
我认为您不需要自定义基任务类。您要实现的是单个实例资产类,该资产类在工作线程初始化后加载,您可以从任务重新加载。
此方法适用于:
# worker.py
import os
import sys
import time
from celery import Celery
from celery.signals import worker_ready
app = Celery(include=('tasks',))
class Asset:
def __init__(self):
self.time = time.time()
class AssetLoader:
__shared_state = {}
def __init__(self):
self.__dict__ = self.__shared_state
if '_value' not in self.__dict__:
self.get_heavy_asset()
def get_heavy_asset(self):
self._value = Asset()
@property
def value(self):
return self._value
@worker_ready.connect
def after_worker_ready(sender, **kwargs):
AssetLoader()
在这里,我使 AssetLoader 成为 Borg 类,但您可以选择任何其他模式/策略来共享 Asset 的单个实例。为了便于说明,我只是在执行get_heavy_asset
时捕获时间戳。
# tasks.py
from worker import app, AssetLoader
@app.task(bind=True)
def load(self):
AssetLoader().get_heavy_asset()
return AssetLoader().value.time
@app.task(bind=True)
def my_task(self):
return AssetLoader().value.time
请记住,资产是按工作进程共享的,而不是在工作线程之间共享的。如果你用concurrency=1
运行,它没有区别,但对于其他任何事情它都会有区别。但是从我在您的用例中收集的信息来看,无论哪种方式都应该没问题。