我有一个带有blob和队列触发器的Azure WebKob,用于将数据保存到Azure DocumentDb。
有时我得到一个错误:
Microsoft.Azure.Documents。RequestRateTooLargeException: Message: {"Errors":["Request rate is large"]}
目前我使用这段代码限制请求。WebJob函数:
public async Task ParseCategoriesFromCsv(...)
{
double find = 2.23, add = 5.9, replace = 10.67;
double requestCharge = Math.Round(find + Math.Max(add, replace));
await categoryProvider.SaveCategories(requestCharge , categories);
}
操作文档数据库客户端的类别提供程序:
public async Task<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));
var scheduler = new IntervalTaskScheduler(requestDelay, Scheduler.Default); // Rx
var client = new DocumentClient(endpoint, authorizationKey,
new ConnectionPolicy
{
ConnectionMode = documentDbOptions.ConnectionMode,
ConnectionProtocol = documentDbOptions.ConnectionProtocol
});
return await Task.WhenAll(documents.Select(async d =>
await scheduler.ScheduleTask(
() => client.PutDocumentToDb(collectionOptions.CollectionLink, d.SearchIndex, d))));
}
控制/测量/同步请求的任务调度程序:
private readonly Subject<Action> _requests = new Subject<Action>();
private readonly IDisposable _observable;
public IntervalTaskScheduler(TimeSpan requestDelay, IScheduler scheduler)
{
_observable = _requests.Select(i => Observable.Empty<Action>()
.Delay(requestDelay)
.StartWith(i))
.Concat()
.ObserveOn(scheduler)
.Subscribe(action => action());
}
public Task<T> ScheduleTask<T>(Func<Task<T>> request)
{
var tcs = new TaskCompletionSource<T>();
_requests.OnNext(async () =>
{
try
{
T result = await request();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
});
return tcs.Task;
}
基本上是ResourceResponse<Document>.RequestCharge
的一些常量但是:
- 当我有1个队列触发它工作正常,但当8队列它抛出一个错误。
- 如果将请求费用增加8倍,则8个队列可以正常工作,但只有1个队列的工作速度慢8倍。
什么节流/测量/同步机制可以在这里很好地工作?
从。net SDK 1.8.0开始,我们自动在合理范围内处理请求率过大的异常(默认情况下重试9次,并在从服务器返回下一次重试后尊重重试)。
如果你需要更好的控制,你可以在ConnectionPolicy实例上配置RetryOptions,你传递给DocumentClient对象,我们会用它覆盖默认的重试策略。
因此,您不再需要像上面那样在应用程序代码中添加任何自定义逻辑来处理429异常。
当获得429(请求率太大)时,响应会告诉您需要等待多长时间。有一个标头x-ms-retry-after。这是有值的。在ms中等待该时间段
catch (AggregateException ex) when (ex.InnerException is DocumentClientException)
{
DocumentClientException dce = (DocumentClientException)ex.InnerException;
switch ((int)dce.StatusCode)
{
case 429:
Thread.Sleep(dce.RetryAfter);
break;
default:
Console.WriteLine(" Failed: {0}", ex.InnerException.Message);
throw;
}
}
在我看来,你应该能够用你的SaveCategories
方法做到这一点,使它与Rx很好地工作:
public IObservable<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));
var client = new DocumentClient(endpoint, authorizationKey,
new ConnectionPolicy
{
ConnectionMode = documentDbOptions.ConnectionMode,
ConnectionProtocol = documentDbOptions.ConnectionProtocol
});
return
Observable.Interval(requestDelay)
.Zip(documents, (delay, doc) => doc)
.SelectMany(doc => Observable.FromAsync(() => client.PutDocumentToDb(collectionOptions.CollectionLink, doc.SearchIndex, doc)))
.ToArray();
}
这完全摆脱了您的IntervalTaskScheduler
类,并确保您将请求率限制为每个requestDelay
时间跨度的一个请求,但允许响应花费尽可能长的时间。To .ToArray()
调用将返回多个值的IObservable<ResourceResponse<Document>>
转换为在可观察对象完成时返回单个值数组的IObservable<ResourceResponse<Document>[]>
。
我无法测试你的代码,所以我测试了一个我认为模拟你的代码的样本:
var r = new Random();
var a = Enumerable.Range(0, 1000);
var i = Observable.Interval(TimeSpan.FromSeconds(2.0));
var sw = Stopwatch.StartNew();
var query =
i.Zip(a, (ii, aa) => aa)
.SelectMany(aa => Observable.Start(() =>
{
var x = sw.Elapsed.TotalMilliseconds;
Thread.Sleep(r.Next(0, 5000));
return x;
}))
.Select(x => new
{
started = x,
ended = sw.Elapsed.TotalMilliseconds
});
我得到了这样的结果,这表明请求被限制了:
4026.2983 5259.7043
2030.1287 6940.2326
6027.0439 9664.1045
8027.9993 10207.0579
10028.1762 12301.4746
12028.3190 12711.4440
14040.7972 17433.1964
16040.9267 17574.5924
18041.0529 19077.5545