从芹菜中禁用rabbitmq的全局qos



我使用三台aws ec2服务器创建了一个RabbitMQ 3节点集群。我正在尝试访问仲裁我用芹菜创建的队列。当我连接时,它给出了错误

raise error_for_code(reply_code, reply_text,
amqp.exceptions.AMQPNotImplementedError: Basic.consume: (540) NOT_IMPLEMENTED - queue 'Replica_que' in vhost '/' does not support global qos

我想它会工作,如果我禁用全局qos,但我找不到我可以做到的地方。如何在芹菜中禁用全局qos ?

我的芹菜代码

from celery import Celery
from time import sleep
import kombu

broker_uri=['amqp://xxxx:5672/', 'amqp://xxxx:5672/','amqp://xxx:5672/']
backend_uri="mongodb+srv://xxxxx"
app = Celery('TestApp', broker=broker_uri,backend=backend_uri)
app.config_from_object('celeryconfig')
app.conf.task_default_exchange='Replica_que'
app.conf.task_default_routing_key='Replica'
@app.task
def reverse(text):
sleep(10)
return text[:-1]

和配置代码

from kombu import Queue
task_queues = [Queue(name="Replica_que", queue_arguments={"x-queue-type": "quorum"})]
task_routes = {
'tasks.add': 'Replica_que',
}

可以通过添加celeryconfig.py文件

from kombu import Queue
task_queues = [Queue(name="Replica_que", queue_arguments={"x-queue-type": "quorum"})]
task_routes = {
'tasks.add': 'Replica_que',
}

和创建自定义QoS类:https://github.com/celery/celery/issues/6067

所以我添加了QoS类

class NoChannelGlobalQoS(bootsteps.StartStopStep):
requires = {'celery.worker.consumer.tasks:Tasks'}
def start(self, c):
qos_global = False
c.connection.default_channel.basic_qos(0, c.initial_prefetch_count, qos_global)
def set_prefetch_count(prefetch_count):
return c.task_consumer.qos(
prefetch_count=prefetch_count,
apply_global=qos_global,
)
c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)
app.steps['consumer'].add(NoChannelGlobalQoS)

目前这是一个与仲裁队列相关的问题,但这个工作。

相关内容

  • 没有找到相关文章

最新更新