我们可以使用Polly重试而不是ExponentialBackoffRetry在服务总线主题触发Azure功能?<



我们正在使用服务总线主题触发Azure函数,我们计划在Azure函数中实现一个简单的行为,如果在处理/处理过程中出现任何异常,我们希望将下一次重试从某个时间推迟。

目前我们计划使用[ExponentialBackoffRetry]属性,如下面的代码所示。

但是我们可以用Polly retry代替[ExponentialBackoffRetry]吗?基本上哪一种方法对我们的要求是空闲的-[ExponentialBackoffRetry]或Polly retry

下面是我们的服务总线主题触发Azure功能:

[FunctionName(nameof(CardGroupEventSubscriber))]
[ExponentialBackoffRetry(5, "00:00:04", "00:01:00")]
public async Task RunAsync([ServiceBusTrigger("%ServiceBusConfigOptions:TopicEventTypeName%", "%ServiceBusConfigOptions:TopicEventTypeSubscription%",
Connection = "ServiceBusConfigOptions:ConnectionString")]
string sbMsg)
{
try
{   
var message = sbMsg.AsPoco<CardGroupEvent>();
_logger.LogInformation("{class} - {method} - {RequestId} - Start",
nameof(CardGroupEventSubscriber), nameof(CardGroupEventSubscriber.RunAsync), message.RequestID);
_logger.LogInformation($"Started processing message {message.AsJson()} with", nameof(CardGroupEventSubscriber));
var validationResult = new CardGroupEventValidator().Validate(message);
if (validationResult.IsValid)
{
await _processor.ProcessAsync(message);
}

catch (Exception ex)
{
_logger.LogError($"Unable to process card group event {sbMsg.AsJson()} with {nameof(CardGroupEventSubscriber)}," +
$" ExceptionMessage:{ex.Message}, StackTrace: {ex.StackTrace}");
throw;
}
#endregion
}

可以命令式地定义和使用Polly的策略。
ExponentialBackoffRetry属性可以被认为是声明性的。

假设你想定义一个策略

  • 使用抖动进行指数回退
  • 仅当抛出CosmosException时然后你像这样做:
const int maxRetryAttempts = 10;
const int oneSecondInMilliSeconds = 1000;
const int maxDelayInMilliseconds = 32 * oneSecondInMilliSeconds;
var jitterer = new Random();
var policy = Policy
.Handle<CosmosException>()
.WaitAndRetryAsync(
maxRetryAttempts,
retryAttempt =>
{
var calculatedDelayInMilliseconds = Math.Pow(2, retryAttempt) * oneSecondInMilliSeconds;
var jitterInMilliseconds = jitterer.Next(0, oneSecondInMilliSeconds);
var actualDelay = Math.Min(calculatedDelayInMilliseconds + jitterInMilliseconds, maxDelayInMilliseconds);
return TimeSpan.FromMilliseconds(actualDelay);
}
);
  • 我在这里实现了我自己的指数回退
    • 主要原因是为了减少包的数量(所以,我们不需要Polly.Contrib.WaitAndRetry)

现在让我们把它应用到你的RunAsync方法

[FunctionName(nameof(CardGroupEventSubscriber))]
public async Task RunAsync([ServiceBusTrigger("%ServiceBusConfigOptions:TopicEventTypeName%", "%ServiceBusConfigOptions:TopicEventTypeSubscription%",
Connection = "ServiceBusConfigOptions:ConnectionString")]
string sbMsg)
=> await GetExponentialBackoffRetryPolicy.ExecuteAsync(() => RunCoreAsync(sbMsg));
private async Task RunCoreAsync(string sbMsg)
{
try
...
}
  • 我已经将原来的RunAsync的代码移动到RunCoreAsync方法
  • 我用一行代码替换了RunAsync的实现,它创建了上述策略,然后装饰了RunCoreAsync

只是一个边注:在CosmosDb的情况下,以不同的方式处理速率限制/节流可能是有意义的。

当我收到CosmosExceptionStatusCode是429然后使用RetryAfter的值延迟重试,像这样的东西

var policy = Policy
.Handle<CosmosException>(ex => ex.StatusCode == HttpStatusCode.TooManyRequests)
.WaitAndRetryAsync(maxRetryAttempts,
sleepDurationProvider:(_, ex, __) => ((CosmosException)ex).RetryAfter.Value,
onRetryAsync: (_, __, ___, ____) => Task.CompletedTask);

UPDATE # 1:结合两个策略

如果您愿意,您可以将上述两个策略组合在一起。你所需要做的就是使它们独立。因此,无论发生什么,应该只触发其中一个策略。最简单的解决方案是将这个ex => ex.StatusCode != HttpStatusCode.TooManyRequests谓词传递给指数回退策略

IAsyncPolicy GetExponentialBackoffRetryPolicy()
=> Policy
.Handle<CosmosException>(ex => ex.StatusCode != HttpStatusCode.TooManyRequests)
.WaitAndRetryAsync(
maxRetryAttempts,
retryAttempt =>
{
var calculatedDelayInMilliseconds = Math.Pow(2, retryAttempt) * oneSecondInMilliSeconds;
var jitterInMilliseconds = jitterer.Next(0, oneSecondInMilliSeconds);
var actualDelay = Math.Min(calculatedDelayInMilliseconds + jitterInMilliseconds, maxDelayInMilliseconds);
return TimeSpan.FromMilliseconds(actualDelay);
}
);
IAsyncPolicy GetThrottlingAwareRetryPolicy()
=> Policy
.Handle<CosmosException>(ex => ex.StatusCode == HttpStatusCode.TooManyRequests)
.WaitAndRetryAsync(maxRetryAttempts,
sleepDurationProvider: (_, ex, __) => ((CosmosException)ex).RetryAfter.Value,
onRetryAsync: (_, __, ___, ____) => Task.CompletedTask);

为了将这两个组合成一个你有很多选择,我建议使用Policy.WrapAsync

IAsyncPolicy retryPolicy = Policy.WrapAsync(GetExponentialBackoffRetryPolicy(), GetThrottlingAwareRetryPolicy());
//OR
IAsyncPolicy retryPolicy = Policy.WrapAsync(GetThrottlingAwareRetryPolicy(), GetExponentialBackoffRetryPolicy());

这里的顺序不重要,因为它们是独立的策略。