当命令执行失败时,芹菜任务成功



我有一个tasks.py文件,看起来像这样:

from celery import Celery
import subprocess
from flask import flash
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
@app.task(track_started=True)
def move_files(upRepo, filedir):
    subprocess.call(['aptly', 'repo', 'add', upRepo, filedir])
@app.task(track_started=True)
def make_snapshot(existRepoName, snapName):
    subprocess.call(['aptly', 'snapshot', 'create', snapName, 'from', 'repo', existRepoName])

任务顺利通过,当subprocess.call命令成功时一切正常,只是当subprocess.call失败时,任务在技术上仍然通过,因此芹菜将其报告为"成功"。

我一直在看after_return在文档中,但我不确定这是否是我想要的,我也不知道我将如何实现…

谢谢!

尝试subprocess.check_call—如果出现问题,它将抛出异常。

测试用例:

import subprocess
subprocess.check_call('false')

输出
Traceback (most recent call last):
  File "zcheck.py", line 3, in <module>
    subprocess.check_call('false')
  File "/usr/lib/python2.7/subprocess.py", line 540, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'false' returned non-zero exit status 1

对于原始代码,这变成:

tasks.py

from celery import Celery
import subprocess
from flask import flash
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
@app.task(track_started=True)
def move_files(upRepo, filedir):
    subprocess.check_call(['aptly', 'repo', 'add', upRepo, filedir])
@app.task(track_started=True)
def make_snapshot(existRepoName, snapName):
    subprocess.check_call(['aptly', 'snapshot', 'create', snapName, 'from', 'repo', existRepoName])

相关内容

  • 没有找到相关文章

最新更新