是否有重播Azure队列存储消息的模式?



如果消息处理后下游发生故障,我正在寻找重新排队消息的方法。有什么规律可以让我研究吗?

例子:

  • EventGrid事件在blob上传到存储帐户
  • 时触发
  • 队列存储已订阅事件
  • Azure功能由队列存储触发
  • Function处理消息
    • 从EventGrid消息中解析BlobURL
    • 操纵blob
    • 成功完成
  • 从这里下游发生了某种问题,函数需要重新处理队列存储消息

如何持久化队列消息直到所有下游进程完成?

我也有类似的情况。有时,由于需求没有得到满足,消息还不能被处理。我从创建RetryableMessage开始。

public class RetryableMessage<T>
{
// number of times we try to send the message before giving up
public int TriesRemaining = 15;
public T Message { get; set; }
}

我把实际的消息放在message字段中,然后,如果消息处理不顺利,但如果我再试一次,它可能会顺利,我做这样的事情

if (shouldRetry)
{
queuedMessage.TriesRemaining--;
if (queuedMessage.TriesRemaining > 0)
{
queue.Add(queuedMessage);
}
}

这只是将它添加回队列的末尾,所以我可以再试一次。我包含了一个最大尝试次数,所以如果它永远不会结束工作,它会停止重新排队。

这适用于我的情况,因为它通常处理很多消息,其中一个将使这一个将工作。如果您一次只有一条消息,并且它不工作,则此模式不适合您。

由于您正在使用Azure Storage Queue,消息将具有At least onceDelivery Guarantee,这意味着一旦您收到它们,它们将被隐藏visibilitytimeout持续时间(默认为30秒),然后显示在队列中。在处理完每条消息后,必须显式地删除它们,因此可以在完成所有下游操作后推迟删除部分。或者,如果这些操作中的任何一个失败,消息将在其可见性超时结束后自动可用。

正如Azure文档中所说,使用Azure存储队列是绕过您遇到的问题的方法之一:

Your application wants to track progress for processing a message in the queue. It's useful if the worker processing a message crashes. Another worker can then use that information to continue from where the prior worker left off.

最新更新