我应该如何避免使用邮件枪,任务队列和ndb发送重复的电子邮件?



我正在使用任务队列API发送多封电子邮件是带有mailgun的小团体。我的代码看起来或多或少像这样:

class CpMsg(ndb.Model):
group = ndb.KeyProperty()
sent = ndb.BooleanProperty()
#Other properties

def send_mail(messages):
"""Sends a request to mailgun's API"""
# Some code
pass

class MailTask(TaskHandler):
def post(self):
p_key = utils.key_from_string(self.request.get('p'))
msgs = CpMsg.query(
CpMsg.group==p_key,
CpMsg.sent==False).fetch(BATCH_SIZE)
if msgs:
send_mail(msgs)
for msg in msgs:
msg.sent = True
ndb.put_multi(msgs)
#Call the task again in COOLDOWN seconds

上面的代码一直工作正常,但根据文档,任务队列 API 保证任务至少交付一次,因此任务应该是幂等的。现在,大多数情况下,上述代码就是这种情况,因为它只获取具有"已发送"属性等于 False 的消息。问题在于,非祖先ndb查询只是最终一致的,这意味着如果任务快速连续执行两次,则查询可能会返回过时的结果并包含刚刚发送的消息。

我想过为消息包含一个祖先,但由于发送的电子邮件将达到数千个,我担心这可能意味着拥有大型实体组,其写入吞吐量有限。

我应该使用祖先进行查询吗?或者也许有一种方法可以配置邮件枪以避免两次发送相同的电子邮件?我是否应该接受在极少数情况下可能会多次发送几封电子邮件的风险?

避免最终一致性障碍的一种可能方法是使查询keys_only查询,然后遍历消息键以通过键查找(强一致性)获取实际消息,检查msg.sent是否为 True,并在这种情况下跳过发送这些消息。大致如下:

msg_keys = CpMsg.query(
CpMsg.group==p_key,
CpMsg.sent==False).fetch(BATCH_SIZE, keys_only=True)
if not msg_keys:
return
msgs = ndb.get_multi(msg_keys)
msgs_to_send = []
for msg in msgs:
if not msg.sent:
msgs_to_send.append(msg)
if msgs_to_send:
send_mail(msgs_to_send)
for msg in msgs_to_send:
msg.sent = True
ndb.put_multi(msgs_to_send)

您还必须使post调用具有事务性(使用@ndb.transactional()装饰器)。

这应该解决查询最终一致性导致的重复问题。但是,由于数据存储争用(或任何其他原因)导致的事务重试仍存在重复的空间 - 因为send_mail()调用不是幂等的。一次发送一条消息(可能使用任务队列)可以减少发生这种情况的可能性。另请参阅 GAE/P:API 调用的交易安全

相关内容

最新更新