芹菜任务后的HTTP调用已更改状态



我的下一个项目需要一个调度器,因为我使用Django进行编码,所以我选择了Celery。

我正在寻找一种方法,让任务在完成时告诉Django,这样我就可以更新数据库并使用SSE告诉用户。只需将所有逻辑放入任务中,所有这些都可以非常简单地完成。但是,当我计划有几个芹菜工人时,我该怎么办?

我在网上找到了一堆关于单身工人的信息,但如果你有不止一个工人,就没有多少信息涉及这个问题。

我想到的是使用从工作人员到web服务器的http回调,让它知道任务已经完成。看着celery.task.http看起来很有希望,但没有做我需要的事。

解决方案是使用信号并连接手动http调用吗?还是我走错了路?这不是一个常见的问题吗?如何才能更优雅地解决这个问题?

那么,告诉Django是什么意思?我理解你的意思是对的,启动了一个Celery任务的django请求,当这个任务完成时还活着吗?在这种情况下,您可以检查一些存储(数据库、memcached等)。并发送您的SSE。看,有一种方法可以做到这一点。1.您将django视图发送任务到Celery,然后它进入无限循环(或超时60秒的循环?),并在memcached中等待结果。

  1. Celery获取任务执行,并将结果粘贴到memcached中。

  2. Django视图获得新结果,退出循环并发送SSE。

下一个变体是

  1. Django视图将任务发送给Celery,并返回

  2. Celery执行任务,执行后会向您的django应用程序发出简单的HTTP请求。

  3. Django收到来自Celery的http请求,解析params并再次向用户发送SSE

以下是一些似乎可以实现我想要的功能的代码:

在django设置中:

CELERY_ANNOTATIONS = {
    "*": {
        "on_failure": celery_handlers.on_failure,
        "on_success": celery_handlers.on_success
    }
}

在包含的celey_handlers.py文件中:

def on_failure(self, exc, task_id, *args, **kwargs):
    # Use urllib or similar to poke eg; api-int.mysite.com/task_handler/TASK_ID
    pass
def on_success(self, retval, task_id, *args, **kwargs):
    # Use urllib or similar to poke eg; api-int.mysite.com/task_handler/TASK_ID
    pass

然后你可以设置api int来使用这样的东西:

from celery.result import AsyncResult
task_obj = AsyncResult(task_id)
# Logic to handle task_obj.result and related goes here....

相关内容

  • 没有找到相关文章

最新更新