具有LIFO逻辑运行的邮箱制处



我正在学习有关f#代理(MailboxProcessor)。

我正在处理一个非常规的问题。

  • 我有一个代理(dataSource),它是流数据的来源。数据必须由代理数组(dataProcessor)处理。我们可以将dataProcessor视为某种跟踪设备。
  • 数据可能比dataProcessor可以处理其输入的速度更快地流动。
  • 可以延迟一些延迟。但是,我必须确保代理商在其工作之上,并且不会被堆积在过时的观察之下

我正在探索解决这个问题的方法。

第一个想法是在dataSource中实现堆栈(LIFO)。当dataProcessor可用于接收和处理数据时,dataSource将发送可用的最新观察值。该解决方案可能起作用,但可能会变得复杂,因为可能需要阻止和重新激活dataProcessor。并将其状态传达给dataSource,从而导致了两条通信问题。这个问题可能归结为消费者问题中的blocking queue,但我不确定。

第二个想法是让dataProcessor照顾消息分类。在此体系结构中,dataSource将简单地在dataProcessor的队列中发布更新。dataProcessor将使用Scan获取队列中可用的最新数据。这可能是要走的路。但是,我不确定在MailboxProcessor的当前设计中是否可以清除邮件队列,从而删除较旧的过时的消息。此外,在这里,写着:

不幸的是,F#当前版本中的tryscan函数 用两种方式破裂。首先,重点是指定超时 但是实施实际上并没有尊重它。具体来说, 无关的消息重置计时器。其次,与其他扫描一样 功能,消息队列在锁定下进行检查,以防止任何 在扫描持续时间内发布的其他线程,可以是 任意长的时间。因此,tryscan功能本身 倾向于锁定并发系统,甚至可以引入僵局 因为在锁定内评估了呼叫者的代码(例如发布 从函数参数到扫描或tryscan可以死锁代理 当锁定块下的代码等待获取锁时, 已经下了)。

让最新的观察结果反弹可能是一个问题。这篇文章的作者@Jon Harrop提出

我设法围绕着它进行了架构,而最终的体系结构实际上更好。本质上,我热切地使用我自己的本地队列过滤所有消息。

这个想法肯定值得探索,但是,在开始使用代码之前,我欢迎一些有关如何构建解决方案的投入。

谢谢。

听起来您可能需要邮箱处理器的破坏性扫描版本,我在博客系列中使用tpl dataflow实现了此功能您可能对。

我的博客目前正在供维护,但我可以指向您的帖子以宣传格式。

part1
部分2
part3

您也可以在github上查看代码

我还在潜伏的恐怖帖子中写了有关扫描问题的文章

希望有帮助...

tl; dr 我会尝试以下操作:从fsharp.actor或Zach Bray的博客文章中获取邮箱实现,替换consurrentStack(加上添加一些有限的容量逻辑))并使用此更改的代理作为调度程序,将消息从数据源传递给以普通Mbps或Actors实施的DataProcessors的传递。

tl; dr2 如果工人是一种稀缺和缓慢的资源,我们需要处理一条消息,该消息是工人准备就绪时的最新消息堆栈而不是队列(具有一些有限的容量逻辑)加上工人的阻止。调度员脱离了现成的工人,然后从堆栈中弹出一条消息,然后将此消息发送给工人。完成工作后,工人准备就绪时将其招募到队列(例如let! msg = inbox.Receive()之前)。然后,调度器消费者线程将阻止直到任何工人准备就绪,而生产者线程则保持有限的堆栈更新。(有限的堆栈可以用阵列 偏移 大小在锁中完成,而下方太复杂了)

详细信息

MailboxProcessor的设计只有一个消费者。甚至在此处的MBP的源代码中进行了评论(搜索"龙"一词:))

如果将数据发布到MBP,则只有一个线程可以从内部队列或堆栈中获取。在您的特定用例中,我会直接使用concurrentStack或更好地包装到BlockingCollection中:

  • 它将允许许多并发的消费者
  • 它非常快,线安全
  • BlockingCollection具有BoundedCapacity属性,可允许您限制集合的大小。它可以在Add上投放,但是您可以捕获或使用TryAdd。如果A是主要堆栈,而B是备用,则在false AddTryAdd到a,然后将两者与互锁交换。ExChange,然后在A中处理需要的消息,清除它,制作新的备用 - 或使用三个堆栈如果处理可能比B更长,则可能会再次变得完整;通过这种方式,您不会阻止并且不会丢失任何消息,而是可以丢弃不需要的消息是一种受控的方式。

BlockingCollection具有诸如AddToany/Takefromany之类的方法,该方法可用于阻止收缩阵列。这可能会有所帮助,例如:

  • DataSource通过ConcurRentStack实现(BCCS)
  • 为BlockingCollection生成消息
  • 另一个线程会消耗来自BCC的消息,并将其发送到一系列处理BCCS。您说有很多数据。您可能会牺牲一个线程,以无限期地阻止和派遣您的消息
  • 每个处理代理都有自己的BCC或作为代理/Actor/MBP实现的,调度员向其发布消息。在您的情况下,您需要仅向一个处理器发送消息,因此您可以将处理代理存储在圆形缓冲区中,以始终向最近使用的处理器发送消息。

类似的东西:

            (data stream produces 'T)
                |
            [dispatcher's BCSC]
                |
            (a dispatcher thread consumes 'T  and pushes to processors, manages capacity of BCCS and LRU queue)
                 |                               |
            [processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP]
                 |                               |
               (process)                         (process)

您可能需要阅读有关堆数据结构的而不是ConcurrentStack。如果您需要消息的某些属性,例如时间戳记,而不是按照它们到达堆栈的顺序(例如,如果在运输和到达顺序中可能存在延迟<>创建顺序),您可以使用HEAP获得最新消息。

如果您仍然需要代理语义/API,则除了Dave的链接外,还可以阅读几个来源,并以某种方式对多个并发消费者采用实现:

  • Zach Bray关于有效参与者实施的一篇有趣的文章。在那里,您确实需要用async { execute true } |> Async.Start或类似的行替换(在注释// Might want to schedule this call on another thread.下)execute true行CC_27,因为否则生产线程将消耗线程 - 对单个快速生产商不利。但是,对于像上面所述的调度程序,这正是所需的。

  • fsharp.actor (又称Fakka)开发分支和FSHARP MPB源代码(上面的第一个链接)对于实现详细信息可能非常有用。fsharp.actors库已经冻结了几个月,但是开发分支有一些活动。

  • 在这种情况下

我有一个类似的用例,在过去的两天中,我研究了在F#代理/演员上找到的所有内容。这个答案是我自己尝试这些想法的一种待办事项,其中一半是在编写过程中出生的。

最简单的解决方案是贪婪地吃全部收件箱中的消息到达时,除了最新的时,所有人都会丢弃并丢弃所有内容。使用TryReceive轻松完成:

let rec readLatestLoop oldMsg =
  async { let! newMsg = inbox.TryReceive 0
          match newMsg with
          | None -> oldMsg
          | Some newMsg -> return! readLatestLoop newMsg }
let readLatest() =
  async { let! msg = inbox.Receive()
          return! readLatestLoop msg }

面对相同的问题时,我构建了一个更复杂,更有效的解决方案,我称为可取消流,并在此处的F#期刊文章中进行了描述。这个想法是开始处理消息,然后如果将它们取代,则取消该处理。如果进行重大处理,这将显着改善并发。

相关内容

  • 没有找到相关文章

最新更新