<T> 如果 X 分钟内没有新项目进入频道,如何读取小于批量大小的频道中的剩余项目?



我正在使用System.Threading.Channels中的Channel,并希望批量读取项目(5个项目(,我有一个如下的方法,

public class Batcher
{
private readonly Channel<MeasurementViewModel> _channel;
public Batcher()
{
_channel = Channel.CreateUnbounded<MeasurementViewModel>();
}
public async Task<MeasurementViewModel[]> ReadBatchAsync(int batchSize, CancellationToken stoppingToken)
{
var result = new MeasurementViewModel[batchSize];
for (var i = 0; i < batchSize; i++)
{
result[i] = await _channel.Reader.ReadAsync(stoppingToken);
}
return result;
}
}

在ASP.NET核心后台服务中,我像下面这样使用它,

public class WriterService : BackgroundService
{
private readonly Batcher _batcher;
public WriterService(Batcher batcher)
{
_batcher = batcher;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var batchOfItems = await _batcher.ReadBatchAsync(5, stoppingToken);
var range = string.Join(',', batchOfItems.Select(item => item.Value));
var x = range;
}
}
}

这是有效的,每当Channel中有5个项目时,我就会得到range

问题是,当Channel中只剩下2个项目,并且由于最后10分钟没有项目进入Channel,那么如何读取Channel中剩下的2个项目?

您可以创建一个链接的CancellationTokenSource,这样您就可以同时监视外部取消请求和内部引发的超时。以下是使用此技术的示例,通过为ChannelReader类创建ReadBatchAsync扩展方法:

public static async ValueTask<T[]> ReadBatchAsync<T>(
this ChannelReader<T> channelReader,
int batchSize, TimeSpan timeout, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(channelReader);
if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
if (timeout < TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(timeout));
using CancellationTokenSource linkedCTS = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
linkedCTS.CancelAfter(timeout);
List<T> buffer = new();
while (true)
{
var token = buffer.Count == 0 ? cancellationToken : linkedCTS.Token;
T item;
try
{
item = await channelReader.ReadAsync(token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
break; // The cancellation was induced by timeout (ignore it)
}
catch (ChannelClosedException)
{
if (buffer.Count == 0) throw;
break;
}
buffer.Add(item);
if (buffer.Count >= batchSize) break;
}
return buffer.ToArray();
}

此方法将在指定的timeout结束后立即生成批次,或者如果达到batchSize,则会更早生成批次,前提是该批次至少包含一个项目。否则,一旦收到第一个项目,它就会生产一批单个项目。

如果通道已通过调用channel.Writer.Complete()方法完成,并且它不包含更多项,则ReadBatchAsync方法传播由本机ReadAsync方法抛出的相同ChannelClosedException

在外部CancellationToken被取消的情况下,通过抛出OperationCanceledException来传播取消。此时可能已经从ChannelReader<T>内部提取的任何项目都将丢失。这使得取消功能成为一个破坏性操作。建议在那之后丢弃整个Channel<T>

用法示例:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (true)
{
MeasurementViewModel[] batch;
try
{
batch = await _channel.Reader.ReadBatchAsync(
5, TimeSpan.FromMinutes(10), stoppingToken);
}
catch (OperationCanceledException) { return; }
catch (ChannelClosedException) { break; }
Console.WriteLine(String.Join(',', batch.Select(item => item.Value)));
}
await _channel.Reader.Completion; // Propagate possible failure
}

对于批量消费通道的另一种方法,其取消是非破坏性的,你可以看看这个问题:

  • 如何批处理ChannelReader,在消费和处理任何单个项目之间强制执行最大间隔策略

最新更新