我正在运行一个烧瓶应用程序,该应用程序旨在接收零星但繁重的工作。我实现了一系列运行flask应用程序,celery,redis(作为代理(和memcached(作为后端(的docker容器。我使用芹菜将处理权重分叉成块,然后使用 get(( 检索所有结果:
# Multithreading
jobs = group(processing_fn.s(c) for c in chunks)
result = jobs.apply_async()
while not result.ready() :
time.sleep(30)
resultset = result.get()
虽然这确实可以快速而很好地工作,但我在使用芹菜后订购芹菜以释放用于存储任务结果的 RAM 时遇到了麻烦。最终,服务器内存不足,必须重新启动,这远非最佳状态。
我尝试对结果集(甚至结果集中的每个结果(使用.forget
:
result = result.get()
result.forget()
...
resultset = result.get()
for r in result :
r.forget()
然而,这些都没有释放记忆...有什么想法吗?
以下是芹菜应用的实例化方式:
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker = "redis://redis:6379/",
backend = "cache+memcached://memcached:11211"
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
...
celery = make_celery(app)
默认情况下,芹菜会将任何任务的结果存储 1 天:
看: 您可以通过以下方式调整它:CELERY_TASK_RESULT_EXPIRES或 https://docs.celeryproject.org/en/latest/userguide/configuration.html#result-expires