是否可以和/或建议使用多个系统.线程.一个对象中的通道



我正在开发一个.net核心3.0 web应用程序,并决定使用System。线程。单例服务中的通道。我的作用域请求服务的顶层注入这个单例来访问它的通道。

我决定使用这种模式将请求(为其他连接的客户端生成实时更新)与这些更新的执行解耦。

ONE通道在对象中的实现有很多例子。

有人能告诉我在我的单身汉中使用多个频道是否可能/可取吗?

我还没有遇到任何创建多个通道并在创建单例时"启动"它们的问题。我只是还没有到可以在单例上测试多个客户端请求是否能正常工作的地步。(或者根本没有?…)

我使用多个通道的主要动机是希望单例根据通道中项目的类型做不同的事情。

public class MyChannelSingleton 
{
public Channel<MyType> TypeOneChannel = Channel.CreateUnbounded<MyType>();
public Channel<MyOtherType> TypeTwoChannel = Channel.CreateUnbounded<MyOtherType>();
public MyChannelSingleton() 
{
StartChannels();
}
private void StartChannels() 
{
// discarded async tasks due to calling in ctor
_ = StartTypeOneChannel();
_ = StartTypeTwoChannel();
}
private async Task StartTypeOneChannel()
{
var reader = TypeOneChannel.Reader;
while (await reader.WaitToReadAsync())
{
if (reader.TryRead(out MyType item))
{
// item is sucessfully read from channel
}
}
}
private async Task StartTypeTwoChannel()
{
var reader = TypeTwoChannel.Reader;
while (await reader.WaitToReadAsync())
{
if (reader.TryRead(out MyOtherType item))
{
// item is sucessfully read from channel
}
}
}
}

我还希望永远不要"完成"通道,并在应用程序的整个生命周期内都可以使用它们。

只要正确使用,您可以随心所欲地使用。事实上,使用公开处理管道的后台服务(本质上是singleton)是在中使用它们的一种非常常见的方式。NET核心。

通道不仅仅是异步队列。它们类似于DataFlow块——它们可以用于创建处理管道,每个块/工作程序处理来自输入缓冲区/ChannelReader的数据,并将结果转发到输出缓冲区/CannelWriter。DataFlow块通过任务本身处理异步处理。有了渠道,我们需要自己处理工人的任务。

我们需要记住的一个非常重要的概念是,频道不能直接访问。事实上,在几乎所有情况下,它们都不应该作为字段或属性公开。在大多数情况下,只需要一个ChannelReader。在某些情况下,例如在管道的顶部,ChannelWriter可能会被暴露。或者不是。

单个工人/步骤

一个典型的工人步骤看起来像这个

private ChannelReader<MyType2> Step1(ChannelReader<MyType> reader,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<MyOtherType>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var item from reader.ReadAllAsync(token))
{
MyType2 result=........;
await writer.WriteAsync(result);
}
},token).ContinueWith(t=>channel.TryComplete(t));
return channel.Reader;    
}

需要注意的一些事项:

  • 如果需要,可以创建多个任务,并使用Task.WhenAll等待所有工作人员完成,然后再关闭通道
  • 如果管道不够快,可以使用有界通道来防止大量消息累积
  • 如果取消收到信号,则从输入通道读取工作任务都将被取消
  • 当辅助任务完成时,无论是因为它被取消还是抛出,通道都将关闭
  • 当"头"通道完成时,完成将从一个步骤流到下一个步骤

组合步骤

通过将一个输出读取器传递给另一个输入读取器,可以组合多个步骤,例如:

var cts=new CancelaltionTokenSource();
var step1=Step1(headReader,cts.Token);
var step2=Step2(step1,cts.Token);
var step3=Step3(step2,cts.Token);
...
await stepN.Completion;

CancellationTokenSource可用于提前结束管道或设置超时以防止管道挂起。

管道头

"头"阅读器可能来自"适配器"方法,如:

private ChannelReader<T> ToChannel(IEnumerable<T> input,CancellationToken token)
{
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
foreach(var item from input)
{
if (token.IsCancellationRequested)
{
break;
}
writer.TryWrite(result);
}
//No-one else is going to complete this channel
channel.Complete();
return channel.Reader;    
}

在后台服务的情况下,我们可以使用一种服务方法将输入"发布"到头部通道,例如:

class MyService
{
Channel<MyType0> _headChannel;
public MyService()
{
_headChannel=Channel.CreateBounded<MyType0>(5);
}
public async Task ExecuteAsync(CancellationToken token)
{
var step1=Step1(_headChannel.Reader,token);
var step2=Step2(step1,token);        
await step2.Completion;
}
public Task PostAsync(MyType0 input)
{
return _headChannel.Writer.WriteAsync(input);
}
public Stop()
{
_headChannel.Writer.TryComplete();
}
...
}

我故意使用看起来像BackgroundService方法名称的方法名称。StartAsync或ExecuteAsync可用于设置管道。StopAsync可用于发出其完成的信号,例如当最终用户点击Ctrl+时。

排队的BackgroundService示例中显示的另一种有用的技术是注册一个接口,客户端可以使用该接口来发布消息,而不是直接访问服务类,例如:

interface IQueuedService<T>
{
Task PostAsync(T input);
}

与系统相结合。林克。异步

ReadAllAsync()方法返回一个IAsyncEnumerable<T>,这意味着我们可以在System中使用运算符。林克。异步类似Where或Take来过滤、批处理或转换消息,例如:

private ChannelReader<MyType> ActiveOnly(ChannelReader<MyType> reader,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<MyType>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
var inpStream=reader.ReadAllAsync(token)
.Where(it=>it.IsActive);
await foreach(var item from inpStream)
{
await writer.WriteAsync(item);
}
},token).ContinueWith(t=>channel.TryComplete(t));
return channel.Reader;    
}

Channel<T>只是一个线程安全的异步队列。它本身不做任何处理,它只是一个被动的内存FIFO存储。你可以随心所欲地拥有它们。

您可以利用Channel分别公开ReaderWriter这一事实,将类的客户端的访问限制在所需的最低功能。换句话说,您可以考虑公开ChannelWriter<T>ChannelReader<T>类型的属性,而不是公开Channel<T>类型的属性。

此外,创建无边界通道时也应小心。一个被误用的通道很容易使您的应用程序成为OutOfMemoryException的牺牲品。

暴露类型ChannelReader<T>的特性的替代方案可以是暴露IAsyncEnumerable<T>s。

很遗憾,我找不到源代码。称文档稀疏是一种轻描淡写的说法。所以我最多可以告诉你"如果是我的课,我会怎么做"。

内存中有多个通道(特别是无边界通道)的最大问题是内存碎片会导致早期OOM。事实上,即使是一个无界的,一个大问题就是必须扩大收藏。CCD_ 14不过是一个包装CCD_。无界列表的另一个问题是,索引迟早会用完。

我该如何解决这个问题?链接列表。在大约90%的情况下,链表将是我考虑的最后一个集合。剩下的10%是队列和类队列结构。通道看起来非常像队列。在这10%的情况中,在9%的情况下,我只会使用Queue实现所做的任何操作。这是剩下的1%。

对于随机访问,链表是最糟糕的集合。对于队列,这是可行的。但是在避免.NET中与碎片化相关的OOM方面呢?为了最大限度地降低增长成本?为了绕过硬数组限制?在那里,链表是绝对不可战胜的。

如果它不这样做?它应该是可行的,使您自己的版本的通道,做,并只是替换它。

最新更新