芹菜任务立即自动发现



我正在尝试在Celery框架之上构建一个应用程序。

我有一个模块settings/celery_settings.py,其中包含初始化 Celery 应用程序的代码,如下所示(我扩展了一些变量):

from __future__ import absolute_import
from celery import Celery
pfiles = ['other_tasks.test123', 'balance_log.balance_log']
app = Celery('myapp')
# here I just have some parameters defined like broker, result backend, etc
# app.config_from_object(settings)
# TRYING to discover tasks
app.autodiscover_tasks(pfiles)

文件other_tasks/test123.pybalance_log/balance_log.py包含如下任务定义:

# file other_tasks/test123.py
from celery import shared_task, Task
@shared_task()
def mytask():
    print("Test 1234!")
class TestTask01(Task):
    def run(self, client_id=None):
        logger.debug("TestTask01: run")
        return client_id

我经营芹菜工人:

python3 /usr/local/bin/celery -A settings.celery_settings worker

通过这种方式,它可以发现任务。我可以调用这些任务。

但后来我尝试使用 IPython:

In [1]: from settings.celery_settings import app
In [2]: app.tasks
Out[2]: 
{'celery.backend_cleanup': <@task: celery.backend_cleanup of XExchange:0x7f9f50267ac8>,
 'celery.chain': <@task: celery.chain of XExchange:0x7f9f50267ac8>,
 'celery.chord': <@task: celery.chord of XExchange:0x7f9f50267ac8>,
 'celery.chord_unlock': <@task: celery.chord_unlock of XExchange:0x7f9f50267ac8>,
 'celery.chunks': <@task: celery.chunks of XExchange:0x7f9f50267ac8>,
 'celery.group': <@task: celery.group of XExchange:0x7f9f50267ac8>,
 'celery.map': <@task: celery.map of XExchange:0x7f9f50267ac8>,
 'celery.starmap': <@task: celery.starmap of XExchange:0x7f9f50267ac8>}

显然它没有发现任务。

似乎当我显式调用任务时,我首先导入它们并在调用时指定芹菜的确切路径。这就是它起作用的原因。

问:如何使它们被发现具有已知任务列表?

最后我发现autodiscover_tasks函数还有一个附加参数:

def autodiscover_tasks(self, packages, related_name='tasks', force=False):
    ...

因此,设置force=True后,它开始工作!

app.autodiscover_tasks(pfiles, force=True)
这是我

的示例配置:

conf/celeryconfig

from conf import settings
CELERYD_CHDIR='/usr/local/src/imbue/application/imbue'
CELERY_IGNORE_RESULT = False
CELERY_RESULT_BACKEND = "amqp"
CELERY_TASK_RESULT_EXPIRES = 360000
CELERY_RESULT_PERSISTENT = True
BROKER_URL='amqp://<USERNAME>:<PASSWORD>@rabbitmq:5672'
CELERY_ENABLE_UTC=True
CELERY_TIMEZONE= "US/Eastern"
CELERY_IMPORTS=("hypervisor.esxi.vm_operations",
                "tools.deploy_tools",)

虚拟机监控程序/ESXi vm_operations.py

@task(bind=True, default_retry_delay=300, max_retries=5)
def cancel_job(self, host_id=None, vm_id=None, job=None, get_job=False, **kwargs):
    pass

call_task.py

def call_task():
    log.info('api() | Sending task: ' + job_instance.reference)     
    celery = Celery()
    celery.config_from_object('conf.celeryconfig')
    celery.send_task("hypervisor.esxi.vm_operations.cancel_job",
                     kwargs={'job': job_instance,
                             'get_job': True},
                     task_id=job_instance.reference)

我将芹菜与主管一起使用,并从 conf 目录开始:

source ~/.profile
CELERY_LOGFILE=/usr/local/src/imbue/application/imbue/log/celeryd.log
CELERYD_OPTS=" --loglevel=INFO --autoscale=10,5"
cd /usr/local/src/imbue/application/imbue/conf
exec celery worker -n celeryd@%h -f $CELERY_LOGFILE $CELERYD_OPTS

相关内容

  • 没有找到相关文章

最新更新