设置与此类似。一个代理(dataSource
)正在生成数据,而单个代理(dataProcessor
)正在处理数据。生成的数据比dataProcessor
处理的要多得多,我对处理所有消息不感兴趣,只处理最新的一段数据。
Jon Harrop在那里提出的一个可能的解决方案是"贪婪地吃掉收件箱中的所有邮件,并丢弃除最近的邮件外的所有邮件"。另一种方法不是侦听所有消息,而是侦听dataProcessor
到PostAndReply
dataSource
以获得最新的数据。
这些方法的优点和缺点是什么?
这是一个有趣的问题,很可能有几种可能的观点。我认为最值得注意的方面是,选择将影响您如何在两个组件之间的接口上设计API:
-
在"全部消费"方法中,生产者有一个非常简单的API,每当产生价值时,它就会触发一些事件,而你的消费者会订阅它。这意味着你可以让其他订阅者收听生产者的更新,并在这个问题上做一些其他事情,而不是你的消费者。
-
在"获取最新消息的调用"方法中,可能需要编写生产者,以便保持当前状态并丢弃旧值。然后它将提供阻塞异步API以获得最新的值。不过,它仍然可以为其他消费者公开一个事件。消费者将需要主动轮询更改(在某种繁忙的循环中)。
-
您也可以在"Consume all"中创建一个具有事件的生产者,但随后创建另一个组件,该组件侦听任何给定的事件,保留最新的值,并通过对任何其他客户端的阻塞异步调用使其可用。
这里有一些我能想到的优点/缺点:
- 在(1)中,生产者非常简单;消费者更难书写
- 在(2)中,生产者需要做更多的工作,但消费者很简单
- 在(3)中,您添加了另一个层,但使用了一种相当可重用的方式
在检查它不会影响性能后,我可能会选择(2)(如果我只需要一个数据源)或(3)。
至于(3),我的想法大概是这样的:
type KeepLastMessage<'T> =
| Update of 'T
| Get of AsyncReplyChannel<'T>
type KeepLast<'T>(initial:'T, event:IObservable<'T>) =
let agent = MailboxProcessor.Start(fun inbox ->
let rec loop last = async {
let! msg = inbox.Receive()
match msg with
| Update last -> return! loop last
| Get ch -> ch.Reply(last); return! loop last }
loop initial)
member x.AsyncGet() = agent.PostAndAsyncReply(Get)