通过回调消息代理将推送公开为IAsyncEnumerable



我正在使用一个第三方库,该库充当发布子消息代理的接口。经纪人是Solace PubSub+。

对于订户,供应商库采用";通过回叫推送消息";图案

我正在围绕供应商库编写一个自己的包装器库,使其他开发人员更容易使用它(隐藏库与网络通信的所有内部信息等等)。

同样,我认为将订阅者提要公开为IAsyncEnumerable可能会有所帮助,我认为这可能是System.Threading.Channels的一个很好的用例。我有两个担忧:

  1. 这里的频道合适吗?还是我设计得太多了?也就是说,还有更多的";C#惯用语";回拨电话的方式
  2. 我的EnumerableBroker包装器实现是安全的,还是在某个地方陷入了异步陷阱

我意识到第一个问题可能比SO更适合CodeReview,但由于这个问题的答案也与第二个问题有关,因此将它们放在一起似乎是合适的。值得注意的是:我正在避免IObservable/Rx,因为我的目标是使我的接口比供应商的更基本,而不是要求其他开发人员和我自己学习Rx!理解生产者和消费者的过程是如何独立的,对于中间的通道来说也是微不足道的,而对于可观察到的,我的第一个心理过程是";好的,那么生产者和消费者仍然是独立的吗?乍一看,我现在似乎必须学习调度器。。。天哪,我用await foreach怎么样">

以下是一个没有EnumerableBroker:的消费消息的最小模型

// mockup of third party class
private class Broker
{
// mockup of how the third party library pushes messages via callback
public void Subscribe(EventHandler<int> handler) => this.handler = handler;
//simulate the broker pushing messages. Not "real" code
public void Start()
{
Task.Run
(
() =>
{
for (int i = 0; !cts.Token.IsCancellationRequested; i++)
{
// simulate internal latency
Thread.Sleep(10);
handler?.Invoke(this, i);
}
}, cts.Token
);
}
public void Stop() => cts.Cancel();
private CancellationTokenSource cts = new();
private EventHandler<int> handler;
}
private static async Task Main()
{
var broker = new Broker();
broker.Subscribe((_, msg) => Console.WriteLine(msg));
broker.Start();
await Task.Delay(1000);
broker.Stop();
}

现在使用EnumerableBroker的最小复制(仍然使用上面列出的相同模拟Broker类)。这里至少有一个好处是,如果订阅者需要做大量工作来处理消息,它不会占用代理的线程——至少在缓冲区填满之前是这样。这个似乎运行起来没有错误,但我已经学会了警惕我对异步的有限掌握。

private class EnumerableBroker
{
public EnumerableBroker(int bufferSize = 8)
{
buffer = Channel.CreateBounded<int>
(
new BoundedChannelOptions(bufferSize) { SingleReader = true,
SingleWriter = true }
);
}
public IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
broker.Subscribe
(
// switched to sync per Theodor's comments
(_, args) => buffer.Writer.WriteAsync(args, ct).AsTask().Wait()
);
ct.Register(broker.Stop);
broker.Start();
return buffer.Reader.ReadAllAsync(ct);
}
private readonly Channel<int> buffer;
private readonly Broker broker = new();
}
private static async Task Main()
{
var cts = new CancellationTokenSource();
var broker = new EnumerableBroker();
cts.CancelAfter(1000);
try
{
await foreach (var msg in broker.ReadAsync(cts.Token))
{
Console.WriteLine(msg);
}
}
catch (OperationCanceledException) { }
}

我是否对此进行了过度设计?

否。Channel正是实现此功能所需的组件类型。这是一个非常简单的机制。它基本上是BlockingCollection<T>类的异步版本,具有一些额外的功能(如Completion属性)和一个新奇的API(ReaderWriter门面)。

我的EnumerableBroker包装器实现是否安全,或者我是否在某个地方陷入了异步陷阱?

是的,有一个陷阱,您已经落入其中。SingleWriter = true配置意味着在飞行中最多允许一个WriteAsync操作同时进行。在发布下一个WriteAsync之前,必须完成上一个。通过使用async void委托订阅broker,您实际上为代理推送的每条消息创建了一个单独的编写器(生产者)。组件很可能会通过抛出InvalidOperationException之类的东西来抱怨这种滥用。解决方案是而不是切换到SingleWriter = false。这将通过创建一个外部且效率极低的队列来绕过Channel的有限容量,该队列包含不适合Channel的内部队列的消息。解决方案是重新思考您的缓冲策略。如果无法缓冲无限数量的消息,则必须丢弃消息,或者抛出异常并杀死消费者。与await buffer.Writer.WriteAsync不同,最好与bool accepted = buffer.Writer.TryWrite同步馈送信道,并在acceptedfalse的情况下采取适当的行动。

您应该记住的另一个考虑因素是ChannelReader.ReadAllAsync方法是消耗。这意味着,如果同一频道有多个读者/消费者,则每条消息将只传递给其中一个消费者。换句话说,每个消费者将接收到该频道消息的部分子集。你应该把这件事告诉你的同事,因为多次列举同一个IAsyncEnumerable<T>是很琐碎的。毕竟,IAsyncEnumerable<T>只不过是IAsyncEnumerator<T>的工厂。

最后,您可以在IAsyncEnumerator<T>的枚举终止时自动终止订阅,而不是通过CancellationToken来控制每个订阅的生存期,从而使同事的生活更轻松。当await foreach循环以任何方式结束时(如通过break或通过异常),相关联的IAsyncEnumerator<T>被自动处置。C#语言巧妙地将DisposeAsync调用与迭代器的finally块挂钩,如果try/finaly块包装了屈服循环。你可以利用这样一个伟大的功能:

public async IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
broker.Subscribe
(
//...
);
broker.Start();
try
{
await foreach (var msg in buffer.Reader.ReadAllAsync(ct))
{
yield return msg;
}
}
finally
{
broker.Stop();
}
}

这只是在使用Channels不需要太多代码的意义上被过度设计的。一种典型的模式是只使用方法,这些方法接受ChannelReader作为输入并返回ChannelReader作为输出,方法本身创建并拥有输出通道。这使得将阶段组成一个管道变得非常容易,尤其是如果这些方法是扩展方法的话。

在这种情况下,您的代码可以重写为:

static ChannelReader<int> ToChannel(this Broker broker, 
int limit,CancellationToken token=default)
{
var channel=Channel.CreateBounded<int>(limit);
var writer=channel.Writer;
broker.Subscribe((_, args) =>{
writer.TryWrite(args, token);
});
token.Register(()=>writer.Complete());
return channel;
}

这将丢失任何超过限制的消息。如果你的Broker理解Task,你可以使用:

broker.Subscribe(async (_, args) =>{
await writer.WriteAsync(args, token);
});

如果它不理解任务,并且你不能失去任何东西,那么更好的解决方案可能是使用un有界通道,并在稍后阶段处理暂停/恢复。你已经问过类似的问题了。

否则,您将不得不阻止回调:

broker.Subscribe(async (_, args) =>{
writer.WriteAsync(args, token).AsTask().Wait();
});

不过,这不是一个理想的解决方案。

在这两种情况下,您都可以使用读取器生成的数据:

var token=cts.Token;
var reader=broker.ToChannel(10,token);
await foreach(var item in reader.ReadAllAsync(token))
{
...
}

相关内容

  • 没有找到相关文章

最新更新