我在python中有两个任务,想在芹菜后台运行它们。 我使用以下函数来定义芹菜。
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
flask_app = Flask(__name__)
flask_app.config.update(
CELERY_BROKER_URL='redis://localhost:6379/0',
CELERY_RESULT_BACKEND='redis://localhost:6379/0'
)
celery = make_celery(flask_app)
####task 1
@celery.task(name="run1")
def code1(a, b):
return a * b
####task 2
@celery.task(name="run2")
def code2(c, d):
return c + d
code1.delay(5, 8)
code2.delay(77, 26)
我按如下方式运行芹菜工人:
celery -A run1.celery worker&
celery -A run2.celery worker&
但是当我将一些作业发送到后台时,我会遇到以下错误:
"收到类型为'run1'的未注册任务。 该消息已被忽略并丢弃。 您是否记得导入包含此任务的模块? 或者也许您正在使用相对导入?">
当我只运行其中一个芹菜工人时,它可以正常工作 - 但是当运行多个芹菜工人时,我会遇到此错误。
我知道我参加这个聚会太晚了,但也许这可以帮助其他人。
开始芹菜工人的正确方法:
celery -A <filename containing celery object>:<celery object> worker
标准调用是:
celery -A tasks:celery worker
其中tasks.py
包含函数和芹菜对象。芹菜将自动拾取文件中的每个任务/功能。