TPL 数据流 - 条件循环



目前我正在处理管道数据流,其中除阶段 1 之外的每个阶段都是运行async的使用者和生产者。我有对象"流动"在我的管道中,这些对象引用了项目。在第 3 阶段,我想创建一个循环并缓冲满足特殊条件的所有对象(阶段循环)。

如果新对象进入(阶段 3),而当前有其他对象缓冲(阶段循环),我想检查它们是否在其引用项中匹配,如果是,则将它们发布到阶段循环的BufferBlock

问题是,如何从阶段 3 中检查阶段循环中所有对象的引用项?

管道有点像这样:

Incoming objects ->  
BufferBlock1 -> Parsing (Stage2) ->  
BufferBlock2 -> Processing (Stage3) ->
BufferBlock3 -> Stage Loop ->  
Back to BufferBlock 2

你的链中真的不需要那么多BufferBlock。TPL 数据流包含一个TransformBlock,它封装了BufferBloсkActionBlock逻辑,并具有用于处理消息的输出块。

至于循环,您可以使用静态扩展方法将块相互链接,因此这可能看起来像

stage2.LinkTo(stage3, CheckForExistingProcessing);
stage2.LinkTo(stage4);

Jerestage4是未通过检查且必须循环处理的消息的队列。您可以设置其他ActionBlock,或者,也许只是使用TransformBlock将消息再次发送到适当的阶段。我认为您也可以引入重试检查,因为某些消息可能根本无法处理,因此有些原因。

另外,正如您所说,您有async逻辑,您可能应该SendAsync消息而不是Post消息(您也可以将重载与CancellationToken一起使用):

// asynchronously wait for a sending with resending attempts
await stage1.SendAsync(m);
// asynchronously wait for a sending with resending attempts with possible cancellation
await stage2.SendAsync(m, token);

Post方法是同步的,如果目标不接受消息,则会丢弃消息,比较SendAsync方法,即使目标现在无法接受消息,该方法也会尝试传递消息。

最新更新