分布式python:Celery send_task获取COMMAND_INVALID


  1. 上下文

我开发了一个Flask API,它将任务发送到我的计算环境。要使用它,您应该向API发出一个post请求。然后,API接收您的请求,对其进行处理,并通过RABBITMQ代理发送必要的数据,这是一条由计算环境持有的消息。最后,它应该将结果发送回API

  1. 一些代码

下面是我的API和Celery应用程序的示例:

#main.py
# Package
import time
from flask import Flask
from flask import request, jsonify, make_response
# Own module
from celery_app import celery_app
# Environment
app = Flask()
# Endpoint
@app.route("/test", methods=["POST"])
def test():
"""
Test route
Returns
-------
Json formatted output
"""
# Do some preprocessing in here 
result = celery_app.send_task(f"tasks.Client", args=[1, 2])
while result.state == "PENDING":
time.sleep(0.01)
result = result.get()
if result["sucess"]:
result_code = 200
else:
result_code = 500
output = str(result)
return make_response(
jsonify(
text=output,
code_status=result_code,        ),
result_code,
)

# Main thread
if __name__ == "__main__":
app.run()

在另一个文件中,我设置了连接到RABBITMQ队列的芹菜应用程序

#celery_app.py
from celery import Celery, Task
celery_app = Celery("my_celery",
broker=f"amqp://{USER}:{PASSWORD}@{HOSTNAME}:{PORT}/{COLLECTION}",
backend="rpc://"
)
celery_app.conf.task_serializer = "pickle"
celery_app.conf.result_serializer = "pickle"
celery_app.conf.accept_content = ["pickle"]
celery_app.conf.broker_connection_max_retries = 5
celery_app.conf.broker_pool_limit = 1
class MyTask(Task):
def run(self, a, b):
return a + b
celery_app.register_task(MyTask())

要运行它,您应该启动:

python3 main.py

不要忘记运行芹菜工作程序(在其中注册任务后)

然后你可以在上面发布请求:

curl -X POST http://localhost:8000/test
  1. 要解决的问题

当这个简单的API运行时,我在端点上发送请求。不幸的是,它在4上失败了1次

我有两条消息:

  • 第一条消息是:
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
  • 然后,由于超时,我的服务器丢失了消息,因此:
File "main.py", line x, in test
result = celery_app.send_task("tasks.Client", args=[1, 2])
amqp.exceptions.InvalidCommand: Channel.close_ok: (503) COMMAND_INVALID - unimplemented method
  1. 解决此错误

有两种解决方案可以绕过这个问题

  • 重试发送任务,直到任务连续失败5次(try/except-amqp.exceptions.InvalidCommand)

  • 更改超时值。

不幸的是,这似乎不是解决它的最佳方法。

你能帮我吗?

问候

PS:

  • my_packages:

烧瓶=2.0.2

python==3.6

芹菜==4.4.5

rabbitmq==最新

1。预处理失败

我将RabbitMQ版本从最新版本更改为3.8.14

然后,我使用time_limit和soft_time_limit设置了一个芹菜任务超时。

它起作用:)

2.无效命令

为了解决这个问题,我使用了这个重试函数。

I设置:

# max_retries=3
# autoretry_for=(InvalidCommand,)

最新更新