我试图在一段时间内没有事件时实现超时。
场景:
我有一个对象,每次收到消息时都会引发一个事件。我想在一段时间内(比如说,20秒)没有收到消息(OnReceived事件)时做出反应
这就是我迄今为止拥有的
var observable = Observable.FromEventPattern<BasicDeliverEventHandler>(
handler => _innerConsumer.Received += OnReceived,
handler => _innerConsumer.Received -= OnReceived);
var timeout = observable.Timeout(TimeSpan.FromSeconds(20));
using (timeout.Subscribe(_ => { },
exception =>
Tracer.Warning("Eventing Consumer timeout : {0}", exception.Message)))
{ }
我正在从EventPattern创建一个可观察的。然后,使用超时。我不明白的是如何从Timeout中获取异常。当这种情况发生时,我想做出反应。
我不认为Subcribe方法是正确的方法,但这是我从文档中得到的。如果这不是正确的,我愿意接受建议或其他选择。
提前感谢
Timeout
有问题,因为它终止了序列。Throttle
是您想要的,但您还需要插入一个开始元素,以防根本没有事件。
我将事件转换为Unit.Default-当你不在乎发生了什么,只在乎发生了什么时,这很有用-并使用StartWith
来设置油门:
var timeout = observable.Select(_ => Unit.Default)
.StartWith(Unit.Default)
.Throttle(TimeSpan.FromSeconds(20);
var subs = timeout.Subscribe(_ => Console.WriteLine("Timeout!"));
出于兴趣,我也有一个类似的解决方案来检测断开连接的客户端——这次为多个源提供一个超时通知:http://www.zerobugbuild.com/?p=230
让我们看看您的代码。
var observable =
Observable.FromEventPattern<BasicDeliverEventHandler>(
handler => _innerConsumer.Received += OnReceived,
handler => _innerConsumer.Received -= OnReceived
);
var timeout = observable.Timeout(TimeSpan.FromSeconds(20));
using (timeout.Subscribe(
_ => { },
exception =>
Tracer.Warning("Eventing Consumer timeout : {0}", exception.Message)))
{
}
我们可以这样重写订阅逻辑:
var subscription = timeout.Subscribe(
_ => { }
exception =>
Tracer.Warning("Eventing Consumer timeout : {0}", exception.Message)
);
subscription.Dispose(); // This is bad
由于您的订阅将立即被处理,您的观察者不会收到您期望的通知。
通过删除subscription.Dispose()
或using
语句,您的观察者应在订阅后20秒收到TimeoutException
。但是,由于Exception
也会取消订阅,因此您将只收到此Exception
一次。
此外,Timeout
操作符在订阅时启动超时,除非订阅被取消或源观察器完成,否则不会取消超时。
您可能需要尝试使用其他运算符,例如Throttle
。
observable.Throttle(TimeSpan.FromSeconds(20))
.Subscribe(x =>
Console.WriteLine("it has been 20 seconds since we received the last notification.")
)