我一直在尝试使用位于 Redis 之上的 Redis 队列实现任务队列。我摆脱了这个问题,并根据我遇到的问题在 RabbitMQ 之上去了 Celery,如下所述: Redis 队列阻塞
我参考上述(未回答的(SO问题,因为我相信这两个问题足够相似,可以潜在地联系起来 - 无论是我的代码还是设置。
我能够将任务发送到我的 Celery 队列,并且能够通过在我的 Rabbit docker 容器 bash 中调用rabbitmqctl list_queues
或调用来查看它们坐在那里
>>> add_nums.delay(2,3)
<AsyncResult: 197315b1-e18b-4945-bf0a-cc6b6b829bfb>
>>> result = add_nums.AsyncResult( 197315b1-e18b-4945-bf0a-cc6b6b829bfb)
哪里
>>> result.status
'PENDING'
不管我检查多少次。
我尝试在装饰器调用中添加ignore_result=True
,但这没有效果。
我的工作人员类:
./workerA.py
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger( __name__)
# Celery configuration
CELERY_BROKER_URL = 'amqp://***:***@rabbit:5672/' #where the asterisks indicate user, pwd
CELERY_RESULT_BACKEND = 'rpc://'
# Initialize celery
celery = Celery( 'workerA',
broker=CELERY_BROKER_URL,
backend=CELERY_RESULT_BACKEND)
@celery.task( ignore_result=True)
def add_nums( a, b):
logger.info( f'{ a+b=}')
return a+b
我的主要:
./app.py
import logging
from flask.logging import default_handler
from workerA import add_nums
from workerB import sub_nums
from flask import (
Flask,
request,
jsonify,
)
logger = logging.getLogger( )
logger.addHandler( default_handler)
logger.setLevel( logging.INFO)
app = Flask( __name__)
@app.route( '/')
def index():
return 'hello world!'
@app.route( '/add')
def add():
logger.info( 'in add method')
first_num, second_num = ( 1,2)
logger.info( f' { first_num=}')
result = add_nums.delay( first_num, second_num)
logger.info( f' {result=}')
logger.info( f' {result.state=}')
return jsonify({ 'result': result.result}), 200
@app.route( '/subtract')
def subtract():
logger.info( 'in sub method')
first_num, second_num = ( 1,2)
result = sub_nums.delay( first_num, second_num)
logger.info( f' {result=}')
return jsonify( {'result': result.result}), 200
if __name__ == '__main__':
app.run( debug=True)
无论 n 设置多高,调用result.get( timeout=n)
总是会导致超时:简而言之,这些队列永远不会满足。
为了完整起见,我的docker-compose.yml:
version: "3"
services:
web:
build:
context: .
dockerfile: Dockerfile
restart: always
ports:
- 5000:5000
command: python ./app.py -h 0.0.0.0
depends_on:
- rabbit
volumes:
- .:/app
rabbit:
hostname: rabbit
image: rabbitmq:management
environment:
- RABBITMQ_DEFAULT_USER=***
- RABBITMQ_DEFAULT_PASS=***
ports:
- "5673:5672"
- "15672:15672"
worker_1:
build:
context: .
hostname: worker_1
entrypoint: celery
command: -A workerA worker --loglevel=info -Q workerA
volumes:
- .:/app
links:
- rabbit
depends_on:
- rabbit
worker_2:
build:
context: .
hostname: worker_2
entrypoint: celery
command: -A workerB worker --loglevel=info -Q workerB
volumes:
- .:/app
links:
- rabbit
depends_on:
- rabbit
和我的 Dockerfile:
FROM python:3
ADD requirements.txt /app/requirements.txt
WORKDIR /app/
RUN pip install -r requirements.txt
EXPOSE 5000
我正在使用适用于Mac 2.2.0.0的Docker桌面,我的OSX是10.15.2(卡塔琳娜(
关于这个问题的任何帮助将不胜感激。这些队列问题现在已成为我的严重障碍
此问题的原因是没有配置后端来存储结果。使用 Celery(..., backend='rpc://'( 实例化 Celery 对象似乎除了静音"未实现错误:未配置结果后端"错误之外什么也没做。我认为从这个意义上讲,文档具有误导性。
试用 Redis 结果后端以提高性能。我在其他地方也有Elasticsearch和MongoDB用于我的应用程序,我可以瞄准这些应用程序,但更喜欢Redis。午餐后,完成后会反馈结果。