我的下一个项目需要一个调度器,因为我使用Django进行编码,所以我选择了Celery。
我正在寻找一种方法,让任务在完成时告诉Django,这样我就可以更新数据库并使用SSE告诉用户。只需将所有逻辑放入任务中,所有这些都可以非常简单地完成。但是,当我计划有几个芹菜工人时,我该怎么办?
我在网上找到了一堆关于单身工人的信息,但如果你有不止一个工人,就没有多少信息涉及这个问题。
我想到的是使用从工作人员到web服务器的http回调,让它知道任务已经完成。看着celery.task.http看起来很有希望,但没有做我需要的事。
解决方案是使用信号并连接手动http调用吗?还是我走错了路?这不是一个常见的问题吗?如何才能更优雅地解决这个问题?
那么,告诉Django是什么意思?我理解你的意思是对的,启动了一个Celery任务的django请求,当这个任务完成时还活着吗?在这种情况下,您可以检查一些存储(数据库、memcached等)。并发送您的SSE。看,有一种方法可以做到这一点。1.您将django视图发送任务到Celery,然后它进入无限循环(或超时60秒的循环?),并在memcached中等待结果。
-
Celery获取任务执行,并将结果粘贴到memcached中。
-
Django视图获得新结果,退出循环并发送SSE。
下一个变体是
-
Django视图将任务发送给Celery,并返回
-
Celery执行任务,执行后会向您的django应用程序发出简单的HTTP请求。
-
Django收到来自Celery的http请求,解析params并再次向用户发送SSE
以下是一些似乎可以实现我想要的功能的代码:
在django设置中:
CELERY_ANNOTATIONS = {
"*": {
"on_failure": celery_handlers.on_failure,
"on_success": celery_handlers.on_success
}
}
在包含的celey_handlers.py文件中:
def on_failure(self, exc, task_id, *args, **kwargs):
# Use urllib or similar to poke eg; api-int.mysite.com/task_handler/TASK_ID
pass
def on_success(self, retval, task_id, *args, **kwargs):
# Use urllib or similar to poke eg; api-int.mysite.com/task_handler/TASK_ID
pass
然后你可以设置api int来使用这样的东西:
from celery.result import AsyncResult
task_obj = AsyncResult(task_id)
# Logic to handle task_obj.result and related goes here....