什么可能是频道阅读<T>器。阅读任何实现看起来像?



BlockingCollection<T>具有方便的静态TakeFromAny方法,允许您使用多个集合"我想要这些集合中的任何一个的下一个项目"。

ChannelReader<T>没有等效项,所以如果您确实想将多个通道消耗到单个流中 - 比如说将收到的项目打印到 1 个控制台 1,如何做到这一点?

快速路径很容易,但慢路径非常棘手。下面的实现返回一个Task<ValueTuple<T, int>>,其中包含从其中一个读取器获取的值,以及输入数组中该读取器的从零开始的索引。

public static Task<(T Item, int Index)> ReadFromAnyAsync<T>(
params ChannelReader<T>[] channelReaders) =>
ReadFromAnyAsync(channelReaders, CancellationToken.None);
public static async Task<(T Item, int Index)> ReadFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
// Fast path
for (int i = 0; i < channelReaders.Length; i++)
{
if (channelReaders[i].TryRead(out var item)) return (item, i);
}
// Slow path
var locker = new object();
int resultIndex = -1;
T resultItem = default;
while (true)
{
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken, default))
{
bool availableAny = false;
Task[] tasks = channelReaders
.Select(async (reader, index) =>
{
try
{
bool available = await reader.WaitToReadAsync(cts.Token)
.ConfigureAwait(false);
if (!available) return;
}
catch // Cancellation, or channel completed with exception
{
return;
}
availableAny = true;
lock (locker) // Take from one reader only
{
if (resultIndex == -1 && reader.TryRead(out var item))
{
resultIndex = index;
resultItem = item;
cts.Cancel();
}
}
})
.ToArray();
await Task.WhenAll(tasks).ConfigureAwait(false);
if (resultIndex != -1) return (resultItem, resultIndex);
cancellationToken.ThrowIfCancellationRequested();
if (!availableAny) throw new ChannelClosedException(
"All channels are marked as completed.");
}
}
}

相关内容

最新更新