我有一个非常简单的实现。
/lib/queue/__init__.py
from celery import Celery
from os import environ
REDIS_URI = environ.get('REDIS_URI')
app = Celery('tasks',
broker=f'redis://{REDIS_URI}')
app.autodiscover_tasks([
'lib.queue.cache',
], force=True)
/lib/queue/cache/tasks.py
from lib.queue import app
@app.task
def some_task():
pass
Dockerfile
RUN git clone <my_repo> /usr/src/lib
WORKDIR /usr/src/lib
RUN python3 setup.py install
CMD ["celery", "-A", "worker:app", "worker", "--loglevel=info", "--concurrency=4"]
/worker.py
from lib.queue import app
如果我在没有 Docker 的情况下初始化命令行,这就可以正常工作。
celery -A worker:app worker --loglevel=info
> [tasks]
> . lib.queue.cache.tasks.some_task
但是,当我在 Docker 中运行它时,任务仍然是空白的:
> [tasks]
问题:
关于为什么芹菜无法在 Docker 中找到库和任务的任何想法?我正在使用另一个设置几乎相同的Dockerfile
来推送任务,并且它能够毫无问题地访问lib.queue.cache.tasks
。
因为我多次被要求提供我的解决方案,所以就在这里。但是,这可能并没有真正的帮助,因为我现在正在做的事情略有不同。
在我的工作线程文件中,定义了app
,我只有一个任务。
app = Celery("tasks", broker=f"redis://{REDIS_URI}:{REDIS_PORT}/{REDIS_DB}")
@app.task
def run_task(task_name, *args, **kwargs):
print(f"Running {task_name}. Received...")
print(f"- args: {args}")
print(f"- kwargs: {kwargs}")
module_name, method_name = task_name.split(".")
module = import_module(f".{module_name}", package="common.tasks")
task = getattr(module, method_name)
loop = asyncio.get_event_loop()
retval = loop.run_until_complete(task(*args, **kwargs))
这可能与大多数人无关,因为它需要一个字符串参数来导入协程并执行它。这实际上是因为我的任务共享了一些也需要在异步世界中执行的功能。