当我运行celery -A tasks2.celery worker -B
时,我希望看到"芹菜任务"每秒钟打印一次。目前没有打印任何内容。为什么这不起作用?
from app import app
from celery import Celery
from datetime import timedelta
celery = Celery(app.name, broker='amqp://guest:@localhost/', backend='amqp://guest:@localhost/')
celery.conf.update(CELERY_TASK_RESULT_EXPIRES=3600,)
@celery.task
def add(x, y):
print "celery task"
return x + y
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks2.add',
'schedule': timedelta(seconds=1),
'args': (16, 16)
},
}
这是启动worker和beat后的唯一输出:
[tasks]
. tasks2.add
[INFO/Beat] beat: Starting...
[INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[INFO/MainProcess] mingle: searching for neighbors
[INFO/MainProcess] mingle: all alone
您编写了时间表,但没有将其添加到芹菜配置中。因此,beat没有看到要发送的计划任务。下面的示例使用celery.config_from_object(__name__)
从当前模块获取配置值,但是您也可以使用任何其他配置方法。
正确配置后,您将看到来自beat的关于发送计划任务的消息,以及worker接收和运行这些任务时的输出。
from celery import Celery
from datetime import timedelta
celery = Celery(__name__)
celery.config_from_object(__name__)
@celery.task
def say_hello():
print('Hello, World!')
CELERYBEAT_SCHEDULE = {
'every-second': {
'task': 'example.say_hello',
'schedule': timedelta(seconds=5),
},
}
$ celery -A example.celery worker -B -l info
[tasks]
. example.say_hello
[2015-07-15 08:23:54,350: INFO/Beat] beat: Starting...
[2015-07-15 08:23:54,366: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2015-07-15 08:23:54,377: INFO/MainProcess] mingle: searching for neighbors
[2015-07-15 08:23:55,385: INFO/MainProcess] mingle: all alone
[2015-07-15 08:23:55,411: WARNING/MainProcess] celery@netsec-ast-15 ready.
[2015-07-15 08:23:59,471: INFO/Beat] Scheduler: Sending due task every-second (example.say_hello)
[2015-07-15 08:23:59,481: INFO/MainProcess] Received task: example.say_hello[2a9d31cb-fe11-47c8-9aa2-51690d47c007]
[2015-07-15 08:23:59,483: WARNING/Worker-3] Hello, World!
[2015-07-15 08:23:59,484: INFO/MainProcess] Task example.say_hello[2a9d31cb-fe11-47c8-9aa2-51690d47c007] succeeded in 0.0012782540870830417s: None
在4.1.0版本中,您必须将logger
添加到task.py
文件中,如下所示:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@task(name="multiply_two_numbers")
def mul(x, y):
total = x * (y * random.randint(3, 100))
#HERE:
logger.info('Adding {0} + {1}'.format(x, y))
return total
如果你想要更多信息,在文档的中间说明:http://docs.celeryproject.org/en/latest/userguide/tasks.html
我实际上在命令提示符上打印我的命令时遇到了问题,因为我使用了错误的命令,但是我发现了一个项目的链接,这个项目是我分叉的
- (Mac)
celery -A Project worker --loglevel=info
- (如果在Windows上)
celery -A Project worker -l info --pool=solo
确保为计划任务运行了celery beat worker:
celery beat --app app.celery
查看这里的文档:http://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#starting-the-scheduler
我知道这是一个古老的线程,但对于那些遇到它…请检查您的应用程序配置。确保"测试";设置为"false"。在我的例子中,它是一个Flask APP,我也在用Flask APP config
更新芹菜配置celery.conf.update(app.config)
但是要确保TESTING没有设置为true,