如果消息处理后下游发生故障,我正在寻找重新排队消息的方法。有什么规律可以让我研究吗?
例子:
- EventGrid事件在blob上传到存储帐户 时触发
- 队列存储已订阅事件
- Azure功能由队列存储触发
- Function处理消息
- 从EventGrid消息中解析BlobURL
- 操纵blob
- 成功完成
- 从这里下游发生了某种问题,函数需要重新处理队列存储消息
如何持久化队列消息直到所有下游进程完成?
我也有类似的情况。有时,由于需求没有得到满足,消息还不能被处理。我从创建RetryableMessage开始。
public class RetryableMessage<T>
{
// number of times we try to send the message before giving up
public int TriesRemaining = 15;
public T Message { get; set; }
}
我把实际的消息放在message字段中,然后,如果消息处理不顺利,但如果我再试一次,它可能会顺利,我做这样的事情
if (shouldRetry)
{
queuedMessage.TriesRemaining--;
if (queuedMessage.TriesRemaining > 0)
{
queue.Add(queuedMessage);
}
}
这只是将它添加回队列的末尾,所以我可以再试一次。我包含了一个最大尝试次数,所以如果它永远不会结束工作,它会停止重新排队。
这适用于我的情况,因为它通常处理很多消息,其中一个将使这一个将工作。如果您一次只有一条消息,并且它不工作,则此模式不适合您。
由于您正在使用Azure Storage Queue
,消息将具有At least once
的Delivery Guarantee
,这意味着一旦您收到它们,它们将被隐藏visibilitytimeout
持续时间(默认为30秒),然后显示在队列中。在处理完每条消息后,必须显式地删除它们,因此可以在完成所有下游操作后推迟删除部分。或者,如果这些操作中的任何一个失败,消息将在其可见性超时结束后自动可用。
正如Azure文档中所说,使用Azure存储队列是绕过您遇到的问题的方法之一:
Your application wants to track progress for processing a message in the queue. It's useful if the worker processing a message crashes. Another worker can then use that information to continue from where the prior worker left off.