芹菜和芹菜 - "Received unregistered task of type..."



好了,伙计们,这快把我逼疯了。我已经为这个工作了一整天了,现在还不能让它工作!
我的芹菜项目结构是这样的:

# celery.py
from celery.schedules import crontab
from celery import Celery
celery = Celery('scheduler.celery',
                include=['scheduler.tasks'])
celery.config_from_object('celeryconfig')

:

# tasks.py
from scheduler.celery import celery
@celery.task
def test():
    do_something()

:

# celeryconfig.py
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
    'test-cron': {
        'task': 'tasks.test',
         'schedule': crontab(minute='*/1'),
    },
}
# CELERY_IMPORTS = ('tasks', )
BROKER_URL = 'redis://localhost:6379/0'

所有文件都在projects/scheduler/文件夹下。
当我启动celeryd服务时,我可以看到它正在运行并连接到我的代理,但是当我启动celerybeat服务时,我可以在日志中看到消息:Received unregistered task of type 'tasks.test' .
如果我取消CELERY_IMPORTS常量的注释(正如在SO中许多答案中建议的那样),celeryd服务甚至不会启动!实际上,它输出OK,但使用ps ef | grep celery,我可以看到它没有运行。

我的守护进程配置文件是这样的:

# Name of nodes to start
CELERYD_NODES="w1"
# Where to chdir at start.
CELERYD_CHDIR="/home/me/projects/scheduler/"
# Extra arguments to celeryd
CELERYD_OPTS="--time-limit=300 --concurrency=4"
# %n will be replaced with the nodename.
CELERYD_LOG_FILE="/var/log/celery/%n.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"
# Workers should run as an unprivileged user.
CELERYD_USER="celery"
CELERYD_GROUP="celery"
# Extra arguments to celerybeat
CELERYBEAT_OPTS="--schedule=/var/run/celerybeat-schedule"

如果你没有scheduler模块,从模块root运行芹菜:

runner.py :

from celery_test import celery
if __name__ == '__main__':
    celery.start()

celery.py , celery_test.py :

from celery import Celery
celery = Celery('scheduler.celery',
                include=['tasks'])
celery.config_from_object('celeryconfig')

celeryconfig.py :

from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
    'test-cron': {
        'task': 'tasks.test',
         'schedule': crontab(minute='*/1'),
    },
}
BROKER_URL = 'redis://localhost:6379/0'

tasks.py , fixed import:

from celery_test import celery
@celery.task
def test():
    do_something()

有你必须小心的导入和添加runner.py,因为它导入celery_test,然后tasks里面再次导入celery_test


如果你有scheduler模块,并从模块根运行芹菜:

__init__.py empty.

runner.py :

from scheduler.celery_test import celery
if __name__ == '__main__':
    celery.start()

celery.py , celery_test.py :

from celery import Celery
celery = Celery('scheduler.celery',
                include=['scheduler.tasks'])
celery.config_from_object('celeryconfig')      

celeryconfig.py ,固定默认任务名称:

from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
    'test-cron': {
        'task': 'scheduler.tasks.test',
         'schedule': crontab(minute='*/1'),
    },
}
BROKER_URL = 'redis://localhost:6379/0'

tasks.py :

from scheduler.celery_test import celery
@celery.task
def test():
    do_something()

相关内容

  • 没有找到相关文章

最新更新