我正试图从我的Flask应用程序在它自己的容器上运行芹菜。现在我只是在设置一个简单的电子邮件应用程序。容器CMD是
"["芹菜","工人","-loglevel=info"]";
消息被发送到redis broker,芹菜会接收它,但芹菜会给我错误。
"收到类型为的未注册任务'flask_project.views.send_async_email'。该消息已被忽略并丢弃">
我正在烧瓶应用程序的芹菜配置中设置include。我已经重新启动并重建了我的容器,但仍然存在相同的问题。
from flask import Blueprint, current_app
from flask_mail import Mail
from os import getenv
from celery import Celery
from .support_func import decorator_require_api
views = Blueprint('views', __name__)
celery = Celery(views.name,
broker='redis://redis:6379/0',
include=["views.tasks"])
@celery.task
def send_async_email(email_data):
mail = Mail()
mail.send(email_data)
@views.route('/')
def home():
with current_app.app_context():
email_data = {'sender': getenv('MAIL_USERNAME'), 'recipients': ['mrjoli021@gmail.com'],
'message': "This is a test email"}
send_async_email.delay(email_data)
return "Message sent!"
组成:
---
version: "3.9"
services:
flask:
build:
context: ./Docker/flask
container_name: flask
volumes:
- ./app/:/app
restart: unless-stopped
stdin_open: true
#entrypoint: /bin/bash
networks:
- api
nginx:
image: nginx:latest
container_name: nginx
depends_on:
- flask
#entrypoint: /bin/bash
volumes:
- ./nginx_config:/etc/nginx/conf.d
- ./app/:/app
ports:
- "5000:443"
networks:
- api
celery:
build:
context: ./Docker/celery
container_name: celery
depends_on:
- redis
restart: unless-stopped
stdin_open: true
networks:
- api
redis:
image: redis:latest
container_name: redis
depends_on:
- flask
#entrypoint: /bin/bash
networks:
- api
networks:
api:
driver: bridge
-----------------
DockerFile:
FROM python:3.9.7-slim-buster
WORKDIR /app
RUN apt-get update && apt-get install -y
build-essential # python-dev libssl-dev openssl
COPY ./ .
RUN pip3 install -r requirements.txt
ENV CELERY_BROKER_URL=redis://redis:6379/0
CMD ["celery", "worker", "--loglevel=info"]
您需要将芹菜应用程序传递给具有--app
或-A
标志的工作人员(请参阅此处的答案/示例(。
我建议重构一点并提取这个片段:
celery = Celery(views.name,
broker='redis://redis:6379/0',
include=["views.tasks"])
到外部文件,如celery_app.py
,然后将其导入烧瓶应用程序并用于工人:
["celery", "--app", "your_module.celery_app:celery", "worker", "--loglevel=info"]
您应该在工作人员的启动日志中看到已注册的任务(当您看到大C(Celery(徽标时(。。
我终于想通了。我曾经https://blog.miguelgrinberg.com/post/celery-and-the-flask-application-factory-pattern作为参考。现在我可以注册新的蓝图,而无需触摸芹菜配置。这是一项正在进行的工作,但现在容器都已启动并运行。
.
├── Docker
│ ├── celery
│ │ ├── Dockerfile
│ │ └── requirements.txt
│ └── flask
│ ├── Dockerfile
│ └── requirements.txt
├── app
│ ├── flask_project
│ │ ├── __init__.py
│ │ ├── celery_app.py
│ │ └── views.py
├── docker-compose.yml
Compose:
--------------------------------------------------------------------------------
---
version: "3.9"
services:
flask:
build:
context: ./Docker/flask
container_name: flask
volumes:
- ./app/:/app
restart: unless-stopped
stdin_open: true
networks:
- api
nginx:
image: nginx:latest
container_name: nginx
depends_on:
- flask
#entrypoint: /bin/bash
volumes:
- ./nginx_config:/etc/nginx/conf.d
- ./app/:/app
ports:
- "5000:443"
networks:
- api
celery:
build:
context: ./Docker/celery
container_name: celery
depends_on:
- redis
volumes:
- ./app/:/app
restart: unless-stopped
stdin_open: true
networks:
- api
redis:
image: redis:latest
container_name: redis
depends_on:
- flask
#entrypoint: /bin/bash
networks:
- api
networks:
api:
driver: bridge
celery_app.py:
--------------------------------------------------------------------------------
from . import celery, create_app
app = create_app()
app.app_context().push()
__init__.py:
--------------------------------------------------------------------------------
from celery import Celery
celery = Celery(__name__, broker=getenv('CELERY_BROKER_URL'))
def create_app():
app = Flask(__name__)
# Celery stuff
celery.conf.update(app.config)
# Register Blueprints
from .views import views
app.register_blueprint(views, url_prefix='/')
return app
views.py:
--------------------------------------------------------------------------------
from flask import Blueprint, current_app
from flask_mail import Message, Mail
from os import getenv
from . import celery
views = Blueprint('views', __name__)
@celery.task
def send_async_email(email_data):
msg = Message(email_data['subject'],
sender=email_data['sender'],
recipients=email_data['recipients'],
)
msg.body = email_data['message']
mail = Mail()
mail.send(msg)
@views.route('/')
def home():
with current_app.app_context():
email_data = {'sender': getenv('MAIL_USERNAME'),
'recipients': ['some_email@gmail.com'],
'subject': 'testing123',
'message': "testing123"
}
msg = Message(email_data['subject'],
sender=email_data['sender'],
recipients=email_data['recipients'],
)
msg.body = email_data['message']
send_async_email.delay(email_data)
return "Message sent!"