我正在使用带有PHP https://github.com/gjedeer/celery-php 的Celery。可靠性对我们的项目很重要。所以我想确保所有计划的任务都执行,即使是 Rabbitmq 崩溃。所以我安排了一个任务,然后我停止了Rabbitmq并重新开始。当计划时间到来时,任务未执行。这是我的任务。
from celery import Celery
import subprocess
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task(acks_late=True)
def hipoCheckSubscriptions(args):
return subprocess.call(['php54','/path/to/script', '--args='+args])
请帮忙,我做错了什么?我的情况有什么食谱吗?
我为我的网络应用程序使用了一个名为celery-yii的Yii扩展。我在扩展的默认配置中没有找到任何消息持久性设置。所以我搜索了所有扩展类,以找到哪个方法负责向芹菜发布消息。我发现这是芹菜::P ostTask方法。在数组$params它将消息发布设置传递给芹菜交换。因此,我又添加了一个选项 delivery_mode=2 以实现消息持久性。并且此消息在RabbbitMQ崩溃后幸存下来。所以我用这种方式解决了我的问题。