芹菜消耗send_task反应



在django应用程序中,我需要调用一个外部rabbitmq,在Windows服务器上运行并在那里使用一些应用程序,其中django应用程序在Linux服务器上运行。

我目前可以使用芹菜send_task将任务添加到队列中:

app.send_task('tasks', kwargs=self.get_input(), queue=Queue('queue_async', durable=False))

我的设置如下所示:

CELERY_BROKER_URL = CELERY_CONFIG['broker_url']
BROKER_TRANSPORT_OPTIONS = {"max_retries": 3, "interval_start": 0, "interval_step": 0.2, "interval_max": 0.5}
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_DEFAULT_QUEUE = 'celery'
CELERY_TASK_RESULT_EXPIRES = 3600
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_CREATE_MISSING_QUEUES = True

我不确定的是我如何获取和解析响应,因为send_task只返回一个键?

如果要存储任务的结果,则可以使用此参数result_backendCELERY_RESULT_BACKEND具体取决于您使用的芹菜版本。

配置选项的完整列表可以在这里找到(在此页面上搜索result_backend(=> https://docs.celeryproject.org/en/stable/userguide/configuration.html

许多选项可用于存储结果 -SQL DBs , NoSQL DBs, Elasticsearch, Memcache, Redis, etc,etc.根据您的项目堆栈进行选择。

谢谢,这有助于理解。因此,由于我想进一步处理答案,因此我使用在示例中的配置中已经定义的 rpc。

我发现这个例子很有用,因为大多数python celery示例都假设消费者是同一个应用程序,它描述了与Java应用程序Celery-Java的交互,因为它给出了一个关于如何从python端请求的很好的例子。

因此,我的实现现在是:

result = app.signature('tasks', kwargs=self.get_input(), queue=Queue('queue_async', durable=False)).delay().get()

等待并解析结果。

相关内容

  • 没有找到相关文章

最新更新