持续部署的芹菜



我有一个服务,它公开了一个API,然后输入任务,它是用Falcon(API(和Celery(任务管理(实现的。

具体来说,我的工作需要很长时间才能加载,他们的代码看起来像这样

class HeavyOp(celery.Task):
def __init__(self):
self._asset = get_heavy_asset()  # <-- takes long time
@property
def asset(self):
return self._asset
@app.task(base=HeavyOp)
def my_task(data):
return my_task.asset.do_something(data)

实际发生的是,在__init__函数中,某个对象从磁盘读取并保留在内存中,只要工作线程存在。

有时,我想更新该对象。

有没有办法在不停机的情况下重新加载工人?由于这一切都在 API 后面,我不希望将加载重物的几分钟作为停机时间。

我们可以假设主机具有 1 个以上的内核,但解决方案必须是单个主机解决方案。

我认为您不需要自定义基任务类。您要实现的是单个实例资产类,该资产类在工作线程初始化后加载,您可以从任务重新加载。

此方法适用于:

# worker.py
import os
import sys
import time
from celery import Celery
from celery.signals import worker_ready

app = Celery(include=('tasks',))
class Asset:
def __init__(self):
self.time = time.time()

class AssetLoader:
__shared_state = {}
def __init__(self):
self.__dict__ = self.__shared_state
if '_value' not in self.__dict__:
self.get_heavy_asset()
def get_heavy_asset(self):
self._value = Asset()
@property
def value(self):
return self._value

@worker_ready.connect
def after_worker_ready(sender, **kwargs):
AssetLoader()

在这里,我使 AssetLoader 成为 Borg 类,但您可以选择任何其他模式/策略来共享 Asset 的单个实例。为了便于说明,我只是在执行get_heavy_asset时捕获时间戳。

# tasks.py
from worker import app, AssetLoader

@app.task(bind=True)
def load(self):
AssetLoader().get_heavy_asset()
return AssetLoader().value.time
@app.task(bind=True)
def my_task(self):
return AssetLoader().value.time

请记住,资产是按工作进程共享的,而不是在工作线程之间共享的。如果你用concurrency=1运行,它没有区别,但对于其他任何事情它都会有区别。但是从我在您的用例中收集的信息来看,无论哪种方式都应该没问题。

相关内容

  • 没有找到相关文章

最新更新