我在这里关注教程,因为我想先测试兔子,然后封装在docker中。
我的结构看起来像:
-rabbitmq_docker
- test_celery
- __init__.py
- celery.py
- celeryconfig.py
- runtasks.py
- tasks.py
- docker-compose.yml
- dockerfile
- requirements.txt
CeleryConfig.py
## List of modules to import when celery starts
CELERY_IMPORTS = ['test_celery.tasks',]
## Message Broker (RabbitMQ) settings
BROKER_URL = "amqp://admin:mypass@192.168.2.15:5672//"
BROKER_PORT = 5672
## Result store settings
CELERY_RESULT_BACKEND = 'rpc://'
芹菜.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('test_celery')
app.config_from_object('test_celery.celeryconfig', namespace='CELERY')
run_tasks.py
来自test_celery.tasks导入print_statements_test,你好
if __name__ == '__main__':
async_result = hello.delay()
print ("Task result hello: {0}".format(async_result.result))
async_result = print_statements_test.delay()
print ("Task result print_statements_test: {0}".format(async_result.result))
tasks.py
from __future__ import absolute_import
from test_celery.celery import app
@app.task(name='tasks.print_statements_test')
def print_statements_test():
new_order = ['first', 'second', 'third', 'forth', 'fifth', 'sixth']
terminal_output = []
for i in range(len(new_order)):
a = "This is the {0} line.n".format(new_order[i])
print(a)
terminal_output.append(a)
print ("terminal_output: {0}".format(terminal_output))
return terminal_output
@app.task(name='tasks.hello')
def hello():
print ("Hello there!")
我在 rabbitmq_docker
目录中运行 celery -A test_celery worker --loglevel=info
,我得到
[2019-07-09 16:25:46,702: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2019-07-09 16:25:46,716: INFO/MainProcess] mingle: searching for neighbors
[2019-07-09 16:25:47,754: INFO/MainProcess] mingle: all alone
[2019-07-09 17:38:54,675: INFO/MainProcess] celery@my-MBP.fritz.box ready.
我不明白的是为什么它说Connected to amqp://guest:**@127.0.0.1:5672//
,但是 celeryConfig.py 在 BROKER_URL 中包含不同的信息?
也在run_tasks中,任务是用delay((调用的,但是结果输出不显示此功能,只是挂在最后一行( [2019-07-09 17:38:54,675: INFO/MainProcess] celery@my-MBP.fritz.box ready.
(为什么不触发?
在您的celery.py
中,您有namespace='CELERY'
。您的BROKER_URL
设置需要重命名为CELERY_BROKER_URL
。
为什么它说它连接到Localost -Celery工人默认情况下连接到amqp://guest:**@127.0.0.1:5672//
,如果您不指定经纪人。当应该为broker_url
时,您使用了错误的配置变量(BROKER_URL(。使用芹菜的版本4,他们开始使用低循环配置变量(在此处阅读更多信息:https://celery.readthedocs.io/en/3.0/userguide/configuration.html#id6(
适用于结果后端类似。配置变量应为结果_backend。
如果您希望您的run_tasks.py阻止等待结果执行以下操作:
async_result = hello.delay()
print(async_result.get())
最后,请勿在芹菜任务中调用print((。改用芹菜记录仪:
from celery.utils.log import get_task_logger
LOGGER = get_task_logger(__name__)
# And use this object in your tasks...