当一个渠道生产者可以随时关闭整个渠道时,渠道消费者如何从多个渠道生产者那里获得一切



我是System.Threading.Channels的新手。我有以下消费者代码:

await foreach (var thing in this.Reader.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
{
await this.HandleThingAsync(thing, cancellationToken).ConfigureAwait(false);
}

当消费像这样的单一生产商生产的东西时,这似乎很有效

var things = await this.GetThingsAsync(cancellationToken).ConfigureAwait(false);
await foreach (var thing in things.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
await this.Writer.WriteAsync(thing, cancellationToken).ConfigureAwait(false);
}
this.Writer.Complete();

但是,当我尝试添加相同通用形式的第二个生产者时,一旦两个生产者中的一个完成(并调用this.Writer.Complete()(,另一个生产者仍然需要添加的任何内容都将被拒绝,因为通道已经关闭。这是一个问题,因为我希望读者阅读所有内容,而不仅仅是所有内容,直到任何一个制片人都没有更多的东西可以制作为止。

如何应对这种情况?是否存在一些内置的或其他的";标准";方法例如;冷凝器";暴露多个Channel.Writer对象(每个"真实"生产者一个(和单个Channel.Reader(单个"真实"消费者一个(的通道?

我不认为有一种方法可以调用"标准";。Channel<T>是一种可以以多种不同方式使用的工具,与TaskSemaphoreSlim非常相似。在您的情况下,您可以使用这样的计数器来传播所有生产者的完成情况:

int producersCount = X;
//...
await foreach (var thing in things)
await channel.Writer.WriteAsync(thing);
if (Interlocked.Decrement(ref producersCount) == 0) channel.Writer.Complete();

或者,如果每个生产者都是Task,您可以将一个延续附加到所有这些任务的组合中,如下所示:

var producers = new List<Task>();
//...
_ = Task.WhenAll(producers).ContinueWith(_ => channel.Writer.Complete(),
default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

上面的丢弃(_(已被用于传达ContinueWith延续已以即发即弃方式启动。如果你不喜欢像我一样把未观察到的任务抛到九霄云外,你可以用这样的async void方法来处理生产者的完成:

var producers = new List<Task>();
//...
HandleProducersCompletion();
//...
async void HandleProducersCompletion()
{
try { await Task.WhenAll(producers); }
finally { channel.Writer.Complete(); }
}

这样,channel.Writer.Complete();调用引发的异常将无法处理,并将使进程崩溃。考虑到替代方案,这可以说是一件好事,因为这个过程毫无明显原因地陷入僵局。

我最终根据我的"通道冷凝器";我在最初的问题中提到的想法。它可能很可怕,也可能不可怕和/或充满漏洞,但至少到目前为止,它似乎以一种对我来说相当自然和不引人注目的方式完成了这项工作:

using Nito.AsyncEx;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Rwv37.System.Threading.Channels
{
public class ChannelCondenser<T>
{
private bool IsGoing { get; set; }
private AsyncLock IsGoingLock { get; init; }
private ConcurrentBag<Channel<T>> IncomingChannel { get; init; }
private Channel<T> OutgoingChannel { get; init; }
public ChannelCondenser()
{
this.IsGoingLock = new AsyncLock();
this.IncomingChannel = new();
this.OutgoingChannel = Channel.CreateUnbounded<T>();
}
public async Task GoAsync(CancellationToken cancellationToken = default)
{
using (await this.IsGoingLock.LockAsync(cancellationToken).ConfigureAwait(false))
{
if (this.IsGoing)
{
throw new System.InvalidOperationException("Cannot go - already going!");
}
this.IsGoing = true;
}
List<Task> tasks = new();
foreach (var incomingChannel in this.IncomingChannel)
{
tasks.Add(this.HandleIncomingChannelAsync(incomingChannel, cancellationToken));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
this.OutgoingChannel.Writer.Complete();
}
public ChannelWriter<T> AddIncomingChannel()
{
using (this.IsGoingLock.Lock())
{
if (this.IsGoing)
{
throw new System.InvalidOperationException("New incoming channels cannot be added while going!");
}
}
Channel<T> incomingChannel = Channel.CreateUnbounded<T>();
this.IncomingChannel.Add(incomingChannel);
return incomingChannel.Writer;
}
public ChannelReader<T> GetOutgoingChannel()
{
return this.OutgoingChannel.Reader;
}
private async Task HandleIncomingChannelAsync(Channel<T> incomingChannel, CancellationToken cancellationToken)
{
await foreach (var item in incomingChannel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
await this.OutgoingChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
}
}
}

消费者和生产者的使用与我最初的问题中显示的完全没有变化。

在它们之外,我唯一需要改变的是如何构建使用它们的类。消费者结构从。。。

private Channel<Thing> WantedThingsChannel { get; init; }
(...)
this.WantedThingsChannel = Channel.CreateUnbounded<Thing>();
this.WantedThingsHandler = new(this.WantedThingsChannel.Reader);

到…

private ChannelCondenser<Thing> WantedThingsCondenser { get; init; }
(...)
this.WantedThingsCondenser = new();
this.WantedThingsHandler = new(this.WantedThingsCondenser.GetOutgoingChannel());

同样,生产商的结构也从。。。

this.WantedThingsRetriever = new(this.WantedThingsChannel.Writer);

到…

this.WantedThingsRetriever = new(this.WantedThingsCondenser.AddIncomingChannel());

哦,不,等等,我撒谎了。它们之外的另一个变化是:我的程序的主Task.WhenAll被更改了,所以它额外地等待ChannelCondenser。所以,从。。。

List<Task> tasks = new()
{
this.WantedThingsHandler.GoAsync(cancellationToken),
this.WantedThingsRetriever.GoAsync(cancellationToken),
};

到…

List<Task> tasks = new()
{
this.WantedThingsCondenser.GoAsync(cancellationToken),
this.WantedThingsHandler.GoAsync(cancellationToken),
this.WantedThingsRetriever.GoAsync(cancellationToken),
};

最新更新