我正在尝试运行一个简单的Celery示例:芹菜示例本地文件系统。
这是一个任务模块:
#tasks.py
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y
这是一个配置:
#celeryconfig.py
"""Celery configuration using local filesystem only."""
from pathlib import Path
# paths for file backend, create folders
_root = Path(__file__).parent.resolve().joinpath('data')
#_root = Path('c:\temp').parent.resolve().joinpath('data')
_backend_folder = _root.joinpath('results')
_backend_folder.mkdir(exist_ok=True, parents=True)
_folders = {
'data_folder_in': _root.joinpath('in'),
'data_folder_out': _root.joinpath('in'), # has to be the same as 'data_folder_in'
'processed_folder': _root.joinpath('processed')
}
for fn in _folders.values():
fn.mkdir(exist_ok=True)
# celery config
result_backend = 'file://{}'.format(str(_backend_folder))
broker_url = 'filesystem://'
broker_transport_options = {k: str(f) for k, f in _folders.items()}
task_serializer = 'json'
persist_results = True
result_serializer = 'json'
accept_content = ['json']
imports = ('tasks',)
这是一个主要模块:
#main.py
from celery import Celery, signature
app = Celery('tasks')
app.config_from_object('celeryconfig')
add = signature('tasks.add')
print('1 + 1 = {}'.format(add.delay(1, 1).get(timeout=3.)))
在这里,当我尝试在windows上运行芹菜时,得到一个错误:
$ celery -A tasks worker --loglevel=INFO
[2021-04-03 18:16:35,578: CRITICAL/MainProcess] Unrecoverable error: ValueError("Port could not be cast to integer value as '\\Users\\marci\\code\\django\\cellery_test\\data\\results'")
Traceback (most recent call last):
File "c:python39libsite-packagesceleryworkerworker.py", line 203, in start
self.blueprint.start(self)
File "c:python39libsite-packagescelerybootsteps.py", line 112, in start
self.on_start()
File "c:python39libsite-packagesceleryappsworker.py", line 136, in on_start
self.emit_banner()
File "c:python39libsite-packagesceleryappsworker.py", line 170, in emit_banner
' n', self.startup_info(artlines=not use_image))),
File "c:python39libsite-packagesceleryappsworker.py", line 232, in startup_info
results=self.app.backend.as_uri(),
File "c:python39libsite-packagescelerybackendsbase.py", line 143, in as_uri
url = maybe_sanitize_url(self.url or '')
File "c:python39libsite-packageskombuutilsurl.py", line 118, in maybe_sanitize_url
return sanitize_url(url, mask)
File "c:python39libsite-packageskombuutilsurl.py", line 111, in sanitize_url
return as_url(*_parse_url(url), sanitize=True, mask=mask)
File "c:python39libsite-packageskombuutilsurl.py", line 76, in url_to_parts
parts.port,
File "c:python39liburllibparse.py", line 175, in port
raise ValueError(message) from None
ValueError: Port could not be cast to integer value as '\Users\marci\code\django\cellery_test\data\results'
路径解码似乎有问题。有人面临这个问题吗?我将感谢你的帮助!
在我的案例中,我在项目文件夹根目录中创建了一个.env
文件,其中包含以下变量:
REDIS_URL=redis://127.0.0.1:6379/
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_DB=0
然后,确保你的项目中有一个redis.py
文件,与wsgi.py
文件所在的文件夹相同:
import redis as r
from .settings import REDIS_HOST, REDIS_PORT, REDIS_DB
redis = r.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
在celery.py
文件上,在redis.py
和wsgi.py
文件所在的同一文件夹中,放入类似的代码:
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
在同一个文件夹中,确保您已经创建了一个__init__.py
文件,其中包含:
from __future__ import absolute_import
from .celery import app as celery_app
在settings.py
中,我添加了以下代码:
import os
from dotenv import load_dotenv
...
load_dotenv()
...
REDIS_URL = os.getenv('REDIS_URL')
REDIS_HOST = os.getenv('REDIS_HOST')
REDIS_PORT = os.getenv('REDIS_PORT')
REDIS_DB = os.getenv('REDIS_DB')
BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}'
CELERY_RESULT_BACKEND = BROKER_URL
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
在这之后,你可以用本教程部署你的项目吗?例如:https://realpython.com/asynchronous-tasks-with-django-and-celery/或其他教程:https://www.botreetechnologies.com/blog/implementing-celery-using-django-for-background-task-processing/
我希望能帮助你。
我遇到的问题是在我传递给urllib.parse
的url中使用了特殊字符(/
、?
、#
、@
和:
(。一旦我把它们从小路上移开,效果就很好。
我会首先检查结果后端URL是否正确(检查https://en.wikipedia.org/wiki/File_URI_scheme获取更多信息(。我不知道filesystem://
是一个可能的经纪人替代品。即使是这样,它也很可能是高度实验性的,所以我建议在它成熟之前不要使用它(我真诚地怀疑这是否会发生,因为我真的不明白这一点——Celery应该是一个分布式系统,所以文件系统作为代理对我来说毫无意义(。请使用此处列出的经纪人:https://docs.celeryproject.org/en/stable/getting-started/brokers/index.html或者,如果你有使用不在清单上的东西的硬性要求,就不要使用芹菜。