目前我正在处理管道数据流,其中除阶段 1 之外的每个阶段都是运行async
的使用者和生产者。我有对象"流动"在我的管道中,这些对象引用了项目。在第 3 阶段,我想创建一个循环并缓冲满足特殊条件的所有对象(阶段循环)。
如果新对象进入(阶段 3),而当前有其他对象缓冲(阶段循环),我想检查它们是否在其引用项中匹配,如果是,则将它们发布到阶段循环的BufferBlock
。
问题是,如何从阶段 3 中检查阶段循环中所有对象的引用项?
管道有点像这样:
Incoming objects ->
BufferBlock1 -> Parsing (Stage2) ->
BufferBlock2 -> Processing (Stage3) ->
BufferBlock3 -> Stage Loop ->
Back to BufferBlock 2
你的链中真的不需要那么多BufferBlock
。TPL 数据流包含一个TransformBlock
,它封装了BufferBloсk
和ActionBlock
逻辑,并具有用于处理消息的输出块。
至于循环,您可以使用静态扩展方法将块相互链接,因此这可能看起来像
stage2.LinkTo(stage3, CheckForExistingProcessing);
stage2.LinkTo(stage4);
Jerestage4
是未通过检查且必须循环处理的消息的队列。您可以设置其他ActionBlock
,或者,也许只是使用TransformBlock
将消息再次发送到适当的阶段。我认为您也可以引入重试检查,因为某些消息可能根本无法处理,因此有些原因。
另外,正如您所说,您有async
逻辑,您可能应该SendAsync
消息而不是Post
消息(您也可以将重载与CancellationToken
一起使用):
// asynchronously wait for a sending with resending attempts
await stage1.SendAsync(m);
// asynchronously wait for a sending with resending attempts with possible cancellation
await stage2.SendAsync(m, token);
Post
方法是同步的,如果目标不接受消息,则会丢弃消息,比较SendAsync
方法,即使目标现在无法接受消息,该方法也会尝试传递消息。