我有两个rabbitmq节点,每个节点为运行Celery的5个节点提供队列。
10个工作节点中的每一个都以4的并发性运行Celery。
当我向每个Celery广播rate_limit时,速率似乎不是强制的。
我用来广播rate_limit的代码:
app = Celery('tasks', broker="amqp://%s:%s@%s/%s" % (config.rabbit_user, config.rabbit_pass, rabbit_ip, config.rabbit_vhost))
app.control.broadcast('rate_limit', arguments={'task_name': 'tasks.read', 'rate_limit': '100/s'})
根据我的理解,这个限制是针对每个worker实例的,而不是全局的,因此,我预计10个节点中的每个芹菜worker每秒会有100条消息。通常情况下,它们每秒消耗大约500-600,但在广播之后,我在每个节点上每秒消耗大约1.6。我查看了日志,大约每5秒有8条消息。
将限制提高到200/s,我的速度在2/s到2.4/s之间。
看起来这是一个问题,并进行了推送来解决它。https://github.com/celery/celery/issues/3166