芹菜:无法通过烧瓶调用任务,在烧瓶外工作



>@Update: 将任务从shared_task更改为 app.celeryd.celery.task 可以解决此问题。是否有一些额外的设置可以使shared_tasks正常工作?

完全重写仍然是相同的错误。我会尽量保持相对较短。新的项目结构更简洁一些,如下所示:

proj
|-- app
|   |-- controller
|   |   |-- __init__.py
|   |   +-- greeting_model.py
|   |-- model
|   |   |-- __init__.py
|   |   +-- dto
|   |       |-- __init__.py
|   |       +-- greeting_dto.py
|   |-- service
|   |   |-- __init__.py
|   |   +-- greeting_service.py
|   |-- tasks
|   |   |-- __init__.py
|   |   +-- greeting_tasks.py
|   |-- __init__.py
|   |-- celeryd.py
|   +-- flaskd.py
|-- test.py
|-- worker.py
+-- ws.py

我正在分别初始化芹菜和烧瓶,并提供应在客户端计算机上运行的 worker.py,而 ws.py(烧瓶 Web 服务(将在另一台机器上运行。Celery 初始化非常简单,使用 rpc 后端和 RabbitMQ 代理。现在的 2 个队列是静态的,但稍后将从配置中填充这些队列。

from kombu import Queue
from celery import Celery

celery = Celery('LdapProvider',
broker='amqp://admin:passwd@localhost:5672/dev1',
backend='rpc',
include=['app.tasks.greeting_tasks'])
celery.conf.task_queues = (
Queue("q1", routing_key="c1.q1"),
Queue("q2", routing_key="c2.q2"),
)

Worker.py(用于启动芹菜工人 - 这个问题过于简化(:

from app.celeryd import celery as celery
from celery.bin.worker import worker

if __name__ == '__main__':
celeryd = worker(app=celery)
options = {
'broker': 'amqp://admin:passwd@localhost:5672/dev1',
'queues': 'q1',
'loglevel': 'info',
'traceback': True
}
celeryd.run(**options)

我将省略烧瓶初始化并跳转到调用芹菜任务的greeting_service.py:

# greeting_service.py:
from app.celeryd import celery
from app.tasks.greeting_tasks import say_hello

class GreetingService(object):
def say_hello(self, name: str) -> str:
async_result = say_hello.apply_async((name,), queue='q1')
return async_result.get()

# greeting_tasks.py
from celery import shared_task

@shared_task(bind=True)
def say_hello(self, name: str) -> str:
return name.capitalize()

无论我尝试什么,此调用都会通过烧瓶失败。我创建了 test.py 只是为了测试芹菜是否有效:

from app.celeryd import celery
from app.tasks.greeting_tasks import say_hello

if __name__ == '__main__':
async_result = say_hello.apply_async(('jackie',), queue='q1')
print(async_result.get())

与greeting_service.py几乎相同,只是它不是从greeting_controller调用的,这是一个flask_restplus命名空间。test.py 导致的差异:

/home/pupsz/PycharmProjects/provider/venv37/bin/python /home/pupsz/PycharmProjects/provider/test.py
Jackie
Process finished with exit code 0
[2020-01-16 18:56:17,065: INFO/MainProcess] Received task: app.tasks.greeting_tasks.say_hello[bb45e271-563e-405b-8529-7335a3810976]  
[2020-01-16 18:56:17,076: INFO/ForkPoolWorker-2] Task app.tasks.greeting_tasks.say_hello[bb45e271-563e-405b-8529-7335a3810976] succeeded in 0.010257695998006966s: 'Jackie'

而通过烧瓶我得到的只是已经显示的,并且工人日志没有显示任何传入的任务,这意味着通过烧瓶apply_async没有将任务发送到 RabbitMQ:

File "/home/xyz/PycharmProjects/proj/app/service/greeting_service.py", line 8, in say_hello
return async_result.get()
NotImplementedError: No result backend is configured.
Please see the documentation for more information.

我发现 django 有一个类似的问题没有答案,所以我有点卡住了,希望得到某种指导。

解决方案:此处回答了使shared_task按预期工作的解决方案:链接 将 celerz 初始化修改为:

from kombu import Queue
from celery import Celery

celery = Celery('LdapProvider',
broker='amqp://admin:passwd@localhost:5672/dev1',
backend='rpc')
# include=['app.tasks.greeting_tasks'])
celery.conf.task_queues = (
Queue("q1", routing_key="c1.q1"),
Queue("q2", routing_key="c2.q2"),
)
celery.set_default()

即使我要删除注释掉的包含行,工作人员也能成功地拾取app.tasks.greeting_tasks中定义的shared_task:

[tasks]
. app.tasks.greeting_tasks.say_hello

将应用程序设置为 default_app(( 后,即使使用 shared_task,也不会再抛出 NotImplementError。至于原因...我不知道,这是不同配置和谷歌搜索的 6 个小时的试验和错误。我发现在某些更复杂的情况下,官方文档乏善可陈。

最新更新