Partition:如何在每个分区后添加等待



我有一个API,每分钟接受20个请求,之后,我需要等待1分钟才能查询它。我有一个项目列表(通常是1000 +)的细节我需要从API查询,我认为我可以利用Partitioner分区列表20项/请求但很快我意识到Partitioner不工作,我的第二想法是在分区添加delay,但也是一个坏主意,从我的理解增加了延迟后每个请求不需要,相反,我需要一个延迟每次Partition之后。下面是我的代码:

public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
[Optional] int delay)
{
var whenAll = await Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run(async delegate {
var allResponses = new List<V>();
using (partition)
while (partition.MoveNext())
{
allResponses.Add(await body(partition.Current));
await Task.Delay(TimeSpan.FromSeconds(delay));
}
return allResponses;
}, token));
return whenAll.SelectMany(x => x);
}

有谁知道我怎么才能做到这一点吗?

这是一个RateLimiter类,您可以使用它来限制异步操作的频率。在这个答案中可以找到RateLimiter类的一个更简单的实现。

/// <summary>
/// Limits the number of workers that can access a resource, during the specified
/// time span.
/// </summary>
public class RateLimiter
{
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
_semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
_timeUnit = timeUnit;
}
public async Task WaitAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
// Schedule the release of the semaphore using a Timer.
// Use the newly created Timer object as the state object, to prevent GC.
// Handle the unlikely case that the _timeUnit is invalid.
System.Threading.Timer timer = new(_ => _semaphore.Release());
try { timer.Change(_timeUnit, Timeout.InfiniteTimeSpan); }
catch { _semaphore.Release(); throw; }
}
}

使用例子:

List<string> urls = GetUrls();
using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));
string[] documents = await Task.WhenAll(urls.Select(async url =>
{
await rateLimiter.WaitAsync();
return await _httpClient.GetStringAsync(url);
}));

在线演示。

Timer是用这个特定的构造函数构造的,以防止它在触发之前被垃圾收集,正如Nick H.在这个回答中所解释的那样。

注意:这个实现有点漏洞,因为它创建了内部可丢弃的System.Threading.Timer对象,当您完成使用RateLimiter时,这些对象不会被丢弃。任何活动计时器都将阻止RateLimiter被垃圾收集,直到这些计时器触发它们的回调。此外,SemaphoreSlim也没有按应有的方式处理。这些都是较小的缺陷,不太可能影响到只产生少量RateLimiter的程序。如果你打算创建很多这样的问题,你可以看看这个答案的第三版,它的特点是基于Task.Delay方法的一次性RateLimiter


这是RateLimiter类的另一种实现,更复杂,它基于Environment.TickCount64属性而不是SemaphoreSlim。它的优点是不会在后台创建"即发即忘"计时器。缺点是WaitAsync方法不支持CancellationToken参数,并且由于复杂性,bug的概率更高。

public class RateLimiter
{
private readonly Queue<long> _queue;
private readonly int _maxActionsPerTimeUnit;
private readonly int _timeUnitMilliseconds;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
// Arguments validation omitted
_queue = new Queue<long>();
_maxActionsPerTimeUnit = maxActionsPerTimeUnit;
_timeUnitMilliseconds = checked((int)timeUnit.TotalMilliseconds);
}
public Task WaitAsync()
{
int delayMilliseconds = 0;
lock (_queue)
{
long currentTimestamp = Environment.TickCount64;
while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
{
_queue.Dequeue();
}
if (_queue.Count >= _maxActionsPerTimeUnit)
{
long refTimestamp = _queue
.Skip(_queue.Count - _maxActionsPerTimeUnit).First();
delayMilliseconds = checked((int)(refTimestamp - currentTimestamp));
Debug.Assert(delayMilliseconds >= 0);
if (delayMilliseconds < 0) delayMilliseconds = 0; // Just in case
}
_queue.Enqueue(currentTimestamp + delayMilliseconds
+ _timeUnitMilliseconds);
}
if (delayMilliseconds == 0) return Task.CompletedTask;
return Task.Delay(delayMilliseconds);
}
}

最新更新