邮箱处理器,最新消息



设置与此类似。一个代理(dataSource)正在生成数据,而单个代理(dataProcessor)正在处理数据。生成的数据比dataProcessor处理的要多得多,我对处理所有消息不感兴趣,只处理最新的一段数据。

Jon Harrop在那里提出的一个可能的解决方案是"贪婪地吃掉收件箱中的所有邮件,并丢弃除最近的邮件外的所有邮件"。另一种方法不是侦听所有消息,而是侦听dataProcessorPostAndReplydataSource以获得最新的数据。

这些方法的优点和缺点是什么?

这是一个有趣的问题,很可能有几种可能的观点。我认为最值得注意的方面是,选择将影响您如何在两个组件之间的接口上设计API:

  1. 在"全部消费"方法中,生产者有一个非常简单的API,每当产生价值时,它就会触发一些事件,而你的消费者会订阅它。这意味着你可以让其他订阅者收听生产者的更新,并在这个问题上做一些其他事情,而不是你的消费者。

  2. 在"获取最新消息的调用"方法中,可能需要编写生产者,以便保持当前状态并丢弃旧值。然后它将提供阻塞异步API以获得最新的值。不过,它仍然可以为其他消费者公开一个事件。消费者将需要主动轮询更改(在某种繁忙的循环中)。

  3. 您也可以在"Consume all"中创建一个具有事件的生产者,但随后创建另一个组件,该组件侦听任何给定的事件,保留最新的值,并通过对任何其他客户端的阻塞异步调用使其可用。

这里有一些我能想到的优点/缺点:

  • 在(1)中,生产者非常简单;消费者更难书写
  • 在(2)中,生产者需要做更多的工作,但消费者很简单
  • 在(3)中,您添加了另一个层,但使用了一种相当可重用的方式

在检查它不会影响性能后,我可能会选择(2)(如果我只需要一个数据源)或(3)。

至于(3),我的想法大概是这样的:

type KeepLastMessage<'T> = 
| Update of 'T
| Get of AsyncReplyChannel<'T>
type KeepLast<'T>(initial:'T, event:IObservable<'T>) = 
let agent = MailboxProcessor.Start(fun inbox -> 
let rec loop last = async {
let! msg = inbox.Receive()
match msg with 
| Update last -> return! loop last
| Get ch -> ch.Reply(last); return! loop last }
loop initial)
member x.AsyncGet() = agent.PostAndAsyncReply(Get)

相关内容

  • 没有找到相关文章

最新更新