如何批处理 IAsyncEnumerable<T>,在连续批处理之间强制实施最大间隔策略?



我有一个异步消息序列(流(,这些消息有时是大量到达的,有时是零星到达的,我希望以每批10条消息的方式分批处理它们。我还想对接收消息和处理消息之间的延迟设置上限,因此,如果在接收到该批的第一条消息后已经过了5秒,则也应该处理少于10条消息的批。我发现我可以通过使用系统中的Buffer运算符来解决问题的第一部分。互动的异步包:

IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
// Process batch
}

Buffer操作员的签名:

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, int count);

不幸的是,Buffer运算符没有TimeSpan参数的重载,所以我不能很容易地解决问题的第二部分。我将不得不以某种方式实现一个带有计时器的批处理运算符。我的问题是:如何实现具有以下签名的Buffer运算符的变体?

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count);

timeSpan参数应影响Buffer操作员的行为,如

  1. timeSpan在发出上一个批处理之后(或最初在调用Buffer方法之后(已经过去时,必须发出批处理
  2. 如果在发出前一批之后timeSpan已经过去,并且在此期间没有收到任何消息,则必须发出空批
  3. 比每个timeSpan更频繁地发出批次意味着批次已满。在timeSpan过去之前发出一个消息少于count的批是不可取的

如果需要,我可以向我的项目添加外部依赖项,比如系统。互动的异步或系统。林克。异步包。

第页。这个问题的灵感来自最近一个与通道和内存泄漏有关的问题。

使用Channel来实现所需的功能怎么样?如果使用类似这样的扩展方法从队列中读取直到超时,是否存在任何缺陷?

public static async Task<List<T>> ReadWithTimeoutAsync<T>(this ChannelReader<T> reader, TimeSpan readTOut, CancellationToken cancellationToken)
{
var timeoutTokenSrc = new CancellationTokenSource();
timeoutTokenSrc.CancelAfter(readTOut);
var messages = new List<T>();
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, cancellationToken))
{
try
{
await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
{
messages.Add(item);
linkedCts.Token.ThrowIfCancellationRequested();
}
Console.WriteLine("All messages read.");
}
catch (OperationCanceledException)
{
if (timeoutTokenSrc.Token.IsCancellationRequested)
{
Console.WriteLine($"Delay ({readTOut.Milliseconds} msec) for reading items from message channel has expired.");
}
else if (cancellationToken.IsCancellationRequested)
{
Console.WriteLine("Cancelling per user request.");
cancellationToken.ThrowIfCancellationRequested();
}
}
}
timeoutTokenSrc.Dispose();
return messages;
}

为了将超时与最大批量大小相结合,可以再添加一个令牌源:

public static async Task<List<T>> ReadBatchWithTimeoutAsync<T>(this ChannelReader<T> reader, int maxBatchSize, TimeSpan readTOut, CancellationToken cancellationToken)
{
var timeoutTokenSrc = new CancellationTokenSource();
timeoutTokenSrc.CancelAfter(readTOut);
var maxSizeTokenSrc = new CancellationTokenSource();
var messages = new List<T>();
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, maxSizeTokenSrc.Token, cancellationToken))
{
try
{
await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
{
messages.Add(item);
if (messages.Count >= maxBatchSize)
{
maxSizeTokenSrc.Cancel();
}
linkedCts.Token.ThrowIfCancellationRequested();
}....

下面的解决方案使用PeriodicTimer类(.NET 6(接收计时器通知,并使用Task.WhenAny方法协调计时器和枚举任务。为此,PeriodicTimer类比Task.Delay方法更方便,因为它可以直接处理,而不需要附带的CancellationTokenSource

/// <summary>
/// Splits the elements of a sequence into chunks that are emitted when either
/// they are full, or a given amount of time has elapsed after requesting the
/// previous chunk.
/// </summary>
public static async IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
if (timeSpan < TimeSpan.FromMilliseconds(1.0))
throw new ArgumentOutOfRangeException(nameof(timeSpan));
if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
using CancellationTokenSource linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
PeriodicTimer timer = null;
Task<bool> StartTimer()
{
timer = new(timeSpan);
return timer.WaitForNextTickAsync().AsTask();
}
IAsyncEnumerator<TSource> enumerator = source
.GetAsyncEnumerator(linkedCts.Token);
Task<bool> moveNext = null;
try
{
List<TSource> buffer = new();
TSource[] ConsumeBuffer()
{
timer?.Dispose();
TSource[] array = buffer.ToArray();
buffer.Clear();
if (buffer.Capacity > count) buffer.Capacity = count;
return array;
}
Task<bool> timerTickTask = StartTimer();
while (true)
{
if (moveNext is null)
{
if (timerTickTask.IsCompleted)
{
Debug.Assert(timerTickTask.Result);
yield return ConsumeBuffer();
timerTickTask = StartTimer();
}
moveNext = enumerator.MoveNextAsync().AsTask();
}
if (!moveNext.IsCompleted)
{
Task completedTask = await Task.WhenAny(moveNext, timerTickTask)
.ConfigureAwait(false);
if (ReferenceEquals(completedTask, timerTickTask))
{
Debug.Assert(timerTickTask.IsCompleted);
Debug.Assert(timerTickTask.Result);
yield return ConsumeBuffer();
timerTickTask = StartTimer();
continue;
}
}
Debug.Assert(moveNext.IsCompleted);
bool moved = await moveNext.ConfigureAwait(false);
moveNext = null;
if (!moved) break;
TSource item = enumerator.Current;
buffer.Add(item);
if (buffer.Count == count)
{
yield return ConsumeBuffer();
timerTickTask = StartTimer();
}
}
if (buffer.Count > 0) yield return ConsumeBuffer();
}
finally
{
// Cancel the enumerator, for more responsive completion.
try { linkedCts.Cancel(); }
finally
{
// The last moveNext must be completed before disposing.
if (moveNext is not null && !moveNext.IsCompleted)
await Task.WhenAny(moveNext).ConfigureAwait(false);
await enumerator.DisposeAsync().ConfigureAwait(false);
timer?.Dispose();
}
}
}

每次发出区块时,在消费者完成对区块的消费后,计时器都会重新启动。

在线演示。

此实现是破坏性的,这意味着在source序列失败或枚举被取消的情况下,先前从source消耗并缓冲的任何元素都将丢失。有关如何注入非破坏性行为的想法,请参阅此问题。

已采取措施避免泄漏即发即弃MoveNextAsync操作或定时器。

对于使用Task.Delay方法而不是PeriodicTimer类的实现,因此它可以由使用。NET 6.0以前的版本,你可以看看这个答案的第7次修订版。该修订还包括一个诱人但有缺陷的基于Rx的实现。

相关内容

  • 没有找到相关文章

最新更新