我正在编写一个长计算的Flask服务器&;optimal_portfolio()&;我希望客户端发送一个请求来在后台启动计算,然后再发送另一个请求来获得结果。我试图使用芹菜和Redis进行后台计算,但是当我试图访问芹菜任务时,我只得到任务。status = PENDING.
from portfolio_creator.portfolio_creator import optimal_portfolio
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
import os
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = os.environ['REDISTOGO_URL']
app.config['CELERY_RESULT_BACKEND'] = os.environ['REDISTOGO_URL']
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def calculate_optimal_portfolio(risk_coefficient):
return optimal_portfolio(risk_coefficient).to_json(orient="records")
@app.route('/portfolio-creator', methods=['POST'])
def set_portfolio():
risk_coefficient = request.form.get('risk_coefficient', 5)
task = calculate_optimal_portfolio.apply_async(
args=[risk_coefficient])
return jsonify({"task_id": task.id}), 202
@app.route('/portfolio-creator/<task_id>', methods=['GET'])
def get_portfolio(task_id):
result = celery.AsyncResult(task_id).result
if result is None:
return 'Portfolio not ready.', 404
return result, 200
if __name__ == '__main__':
app.run()
我也试过使用Flask会话,但这似乎不适用于芹菜。
有一个类似的问题,我的任务正在执行,但状态始终是PENDING
。
你需要配置芹菜来更新它的状态。我这样配置我的flask应用,然后状态就正确更新了。
app.config.from_mapping(
CELERY=dict(
broker_url=REDIS_URL,
result_backend=REDIS_URL,
task_ignore_result=False,
task_track_started=True,
)
)
task_ignore_Result=False
是SUCCESS
状态出现的必要条件。task_track_started
是STARTED
状态出现的必要条件。
更多信息在这里:https://docs.celeryq.dev/en/stable/userguide/configuration.html。如果另一个阅读本文的人不使用烧瓶,这个答案应该有助于配置没有烧瓶的芹菜https://stackoverflow.com/a/40831283/5915915