我们使用Google App Engine和内置的TaskQueues通过Mandrill发送每日更新电子邮件。它们是在特定时间发送的,通常有很多。我们为要发送的每封电子邮件安排一个任务,其中一些任务失败是因为 Mandrill API 超时或我们达到了某个速率限制。
我们的代码如下所示:
@app.route('/worker/send_transactional_email/', methods=['POST'])
def worker_send_transactional_mail():
payload = json.loads(request.values.get('payload'))
message = {
'to': [payload.get('to')],
'subject': payload.get('subject'),
'from_name': 'Our App',
'from_email': "noreply@ourapp.local",
'text': payload.get('body_text')
}
mandrill_payload = _get_mandrill_payload(message)
url = "https://mandrillapp.com/api/1.0/messages/send-template.json"
req = urllib2.Request(url, mandrill_payload, {'Content-Type': 'application/json'})
try:
urllib2.urlopen(req)
except urllib2.URLError as error:
logging.info("to: " + unicode(payload.get('to')))
logging.info("subject: " + unicode(payload.get('subject')))
logging.info(error.code)
logging.info(error.read())
abort(500)
return 'ok'
该解决方案运行良好,因为失败的任务稍后将重试,然后通常会完成。我们面临的唯一问题是失败的任务显示在错误日志中,并导致Google云监控报告错误。
我想要的是将上面except
块中的abort(500)
替换为告诉 TaskQueue 重试任务但不记录任何错误或其他内容的内容。我知道我可以返回 200-299 以外的任何状态代码,并且 TaskQueue 将重试,但我不确定正确的方法是返回 301 或其他内容,因为它具有高度误导性。
如果在队列上配置重试,则无需更改代码,因为 500 响应将让队列逻辑知道触发重试。除非达到最大重试次数,否则无需手动重新排队,在这种情况下,您应该有一些其他方法来确保您可以明确跟踪任务的所需完成情况并重新排队,或者只是通过检查 X-Appengine-Taskretrycount
标头来捕获最后一次重试。如文档中所述,请确保任务是幂等的。
基于@Nick检查X-Appengine-Taskretrycount
请求标头值并将处理程序设置为终止的答案,我在我的 Flask 应用程序__init__.py
文件中有这个:
from flask import Flask, request, make_response, jsonify
...
@app.before_request
def max_retries_exit():
max_retries = 5
task_retries = request.headers.get('X-Appengine-Taskretrycount', '', type=int)
if task_retries and task_retries > max_retries:
output = []
status_code = 200 if request.method in ['GET', 'HEAD'] else 201
r = make_response(jsonify(output), status_code)
error_msg = f"Task retries exceeded {task_retries} > {max_retries}"
r.headers['X-App-Error'] = error_msg
app.logger.error(error_msg)
return r
else:
return None