我有一个tasks.py文件,看起来像这样:
from celery import Celery
import subprocess
from flask import flash
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
@app.task(track_started=True)
def move_files(upRepo, filedir):
subprocess.call(['aptly', 'repo', 'add', upRepo, filedir])
@app.task(track_started=True)
def make_snapshot(existRepoName, snapName):
subprocess.call(['aptly', 'snapshot', 'create', snapName, 'from', 'repo', existRepoName])
任务顺利通过,当subprocess.call
命令成功时一切正常,只是当subprocess.call
失败时,任务在技术上仍然通过,因此芹菜将其报告为"成功"。
我一直在看after_return
在文档中,但我不确定这是否是我想要的,我也不知道我将如何实现…
谢谢!
尝试subprocess.check_call
—如果出现问题,它将抛出异常。
import subprocess
subprocess.check_call('false')
输出Traceback (most recent call last):
File "zcheck.py", line 3, in <module>
subprocess.check_call('false')
File "/usr/lib/python2.7/subprocess.py", line 540, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'false' returned non-zero exit status 1
import subprocess
subprocess.check_call('false')
Traceback (most recent call last):
File "zcheck.py", line 3, in <module>
subprocess.check_call('false')
File "/usr/lib/python2.7/subprocess.py", line 540, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'false' returned non-zero exit status 1
对于原始代码,这变成:
tasks.py
from celery import Celery
import subprocess
from flask import flash
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
@app.task(track_started=True)
def move_files(upRepo, filedir):
subprocess.check_call(['aptly', 'repo', 'add', upRepo, filedir])
@app.task(track_started=True)
def make_snapshot(existRepoName, snapName):
subprocess.check_call(['aptly', 'snapshot', 'create', snapName, 'from', 'repo', existRepoName])