在django应用程序中,我需要调用一个外部rabbitmq,在Windows服务器上运行并在那里使用一些应用程序,其中django应用程序在Linux服务器上运行。
我目前可以使用芹菜send_task
将任务添加到队列中:
app.send_task('tasks', kwargs=self.get_input(), queue=Queue('queue_async', durable=False))
我的设置如下所示:
CELERY_BROKER_URL = CELERY_CONFIG['broker_url']
BROKER_TRANSPORT_OPTIONS = {"max_retries": 3, "interval_start": 0, "interval_step": 0.2, "interval_max": 0.5}
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_DEFAULT_QUEUE = 'celery'
CELERY_TASK_RESULT_EXPIRES = 3600
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_CREATE_MISSING_QUEUES = True
我不确定的是我如何获取和解析响应,因为send_task只返回一个键?
如果要存储任务的结果,则可以使用此参数result_backend
或CELERY_RESULT_BACKEND
具体取决于您使用的芹菜版本。
配置选项的完整列表可以在这里找到(在此页面上搜索result_backend
(=> https://docs.celeryproject.org/en/stable/userguide/configuration.html
许多选项可用于存储结果 -SQL DBs , NoSQL DBs, Elasticsearch, Memcache, Redis, etc,etc
.根据您的项目堆栈进行选择。
谢谢,这有助于理解。因此,由于我想进一步处理答案,因此我使用在示例中的配置中已经定义的 rpc。
我发现这个例子很有用,因为大多数python celery示例都假设消费者是同一个应用程序,它描述了与Java应用程序Celery-Java的交互,因为它给出了一个关于如何从python端请求的很好的例子。
因此,我的实现现在是:
result = app.signature('tasks', kwargs=self.get_input(), queue=Queue('queue_async', durable=False)).delay().get()
等待并解析结果。