- 上下文
我开发了一个Flask API,它将任务发送到我的计算环境。要使用它,您应该向API发出一个post请求。然后,API接收您的请求,对其进行处理,并通过RABBITMQ代理发送必要的数据,这是一条由计算环境持有的消息。最后,它应该将结果发送回API
- 一些代码
下面是我的API和Celery应用程序的示例:
#main.py
# Package
import time
from flask import Flask
from flask import request, jsonify, make_response
# Own module
from celery_app import celery_app
# Environment
app = Flask()
# Endpoint
@app.route("/test", methods=["POST"])
def test():
"""
Test route
Returns
-------
Json formatted output
"""
# Do some preprocessing in here
result = celery_app.send_task(f"tasks.Client", args=[1, 2])
while result.state == "PENDING":
time.sleep(0.01)
result = result.get()
if result["sucess"]:
result_code = 200
else:
result_code = 500
output = str(result)
return make_response(
jsonify(
text=output,
code_status=result_code, ),
result_code,
)
# Main thread
if __name__ == "__main__":
app.run()
在另一个文件中,我设置了连接到RABBITMQ队列的芹菜应用程序
#celery_app.py
from celery import Celery, Task
celery_app = Celery("my_celery",
broker=f"amqp://{USER}:{PASSWORD}@{HOSTNAME}:{PORT}/{COLLECTION}",
backend="rpc://"
)
celery_app.conf.task_serializer = "pickle"
celery_app.conf.result_serializer = "pickle"
celery_app.conf.accept_content = ["pickle"]
celery_app.conf.broker_connection_max_retries = 5
celery_app.conf.broker_pool_limit = 1
class MyTask(Task):
def run(self, a, b):
return a + b
celery_app.register_task(MyTask())
要运行它,您应该启动:
python3 main.py
不要忘记运行芹菜工作程序(在其中注册任务后)
然后你可以在上面发布请求:
curl -X POST http://localhost:8000/test
- 要解决的问题
当这个简单的API运行时,我在端点上发送请求。不幸的是,它在4上失败了1次。
我有两条消息:
- 第一条消息是:
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
- 然后,由于超时,我的服务器丢失了消息,因此:
File "main.py", line x, in test
result = celery_app.send_task("tasks.Client", args=[1, 2])
amqp.exceptions.InvalidCommand: Channel.close_ok: (503) COMMAND_INVALID - unimplemented method
- 解决此错误
有两种解决方案可以绕过这个问题
重试发送任务,直到任务连续失败5次(try/except-amqp.exceptions.InvalidCommand)
更改超时值。
不幸的是,这似乎不是解决它的最佳方法。
你能帮我吗?
问候
PS:
- my_packages:
烧瓶=2.0.2
python==3.6
芹菜==4.4.5
rabbitmq==最新
1。预处理失败
我将RabbitMQ版本从最新版本更改为3.8.14。
然后,我使用time_limit和soft_time_limit设置了一个芹菜任务超时。
它起作用:)
2.无效命令
为了解决这个问题,我使用了这个重试函数。
I设置:
# max_retries=3
# autoretry_for=(InvalidCommand,)