我有以下项目树(用于测试目的(,我正在努力了解Celery是如何加载任务的。
app
├── __init__.py
├── app.py
├── celery.py
└── my_tasks
├── __init__.py
└── tasks.py
celery.py
包含以下用于创建Celery实例的代码:
from celery import Celery
app = Celery("app", backend="rpc://", broker="redis://localhost:6379/0")
app.autodiscover_tasks()
tasks.py
创建一个任务:
from app.celery import app
from time import sleep
@app.task
def run_tests_for_hash():
# task code here
CCD_ 3包含一个带有一个端点的FastAPI应用程序,用于创建任务
from fastapi import FastAPI
from app.call_flow_tasks.tasks import run_tests_for_hash
app = FastAPI(title="Testing Studio runners")
@app.post("/create_task")
def create_task():
results = run_tests_for_hash.delay()
return {"task_id": results.id, "status": results.status}
但试图从命令行运行Celery worker时,我遇到了一个错误:
$celery -A app worker
Traceback (most recent call last):
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/bin/celery", line 8, in <module>
sys.exit(main())
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/__main__.py", line 16, in main
_main()
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 322, in main
cmd.execute_from_commandline(argv)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 495, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/base.py", line 305, in execute_from_commandline
return self.handle_argv(self.prog_name, argv[1:])
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 487, in handle_argv
return self.execute(command, argv)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 419, in execute
).run_from_argv(self.prog_name, argv[1:], command=argv[0])
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/worker.py", line 223, in run_from_argv
return self(*args, **options)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/base.py", line 253, in __call__
ret = self.run(*args, **kwargs)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/worker.py", line 258, in run
**kwargs)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/worker/worker.py", line 96, in __init__
self.app.loader.init_worker()
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/loaders/base.py", line 114, in init_worker
self.import_default_modules()
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/loaders/base.py", line 108, in import_default_modules
raise response
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/utils/dispatch/signal.py", line 288, in send
response = receiver(signal=self, sender=sender, **named)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
return self.throw()
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
retval = fun(*final_args, **final_kwargs)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 695, in _autodiscover_tasks
return self._autodiscover_tasks_from_fixups(related_name)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 705, in _autodiscover_tasks_from_fixups
pkg for fixup in self._fixups
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 706, in <listcomp>
for pkg in fixup.autodiscover_tasks()
AttributeError: 'NoneType' object has no attribute 'autodiscover_tasks'
命令在与app
python包相同的级别上执行。如果我删除autodiscover
,它会正常工作,并且工作人员会正常加载。关于芹菜如何自动发现任务以及如何从不同模块加载任务,有什么帮助吗?
您可以组合两种类型的自动发现:
app.autodiscover_tasks() # Find tasks using celery.fixups (i.e. Django apps via INSTALLED_APPS).
app.autodiscover_tasks( # Add other tasks not included in the apps.
[
'project.other.tasks',
'another_project.another.app',
]
)
芹菜只有一个BUILTIN_FIXUPS = {'celery.fixups.django:fixup'}
,看看它的代码就明白了。
def autodiscover_tasks(self):
from django.apps import apps
return [config.name for config in apps.get_app_configs()]
来自Celery文档:
https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery.autodiscover_tasks
您应该为autodiscover_tasks
方法提供一个包名称列表,如果您不提供,它将被委托给修复程序(例如:Django-只要您正确配置了这些设置(。
最简单的方法是在包列表下添加任务模块:
app.autodiscover_tasks(packages=['my_tasks'])
然后您就可以在命令行上运行worker了。
希望能有所帮助!
如果你的应用程序更大,可能是:
├── api
│ ├── __init__.py
│ └── v1
│ ├── __init__.py
│ └── routers
│ ├── __init__.py
│ ├── entry.py
│ └── feed.py
├── core
│ ├── __init__.py
│ ├── config.py
│ └── db.py
├── main.py
├── models
│ ├── __init__.py
...
└── workers
├── __init__.py
├── entries
│ ├── __init__.py
│ └── tasks.py
└── worker.py
在worker.py
from celery import Celery
celery_app = Celery(
"worker",
backend="redis://localhost:6379/0",
broker="amqp://localhost:5672//"
)
celery_app.autodiscover_tasks(['app.workers.entries'])