我有一个WCF心跳服务,接受来自客户端的保持活动。如果"保持活动"未在指定时间内到达,则会引发超时并采取一些操作。现在,我正在尝试简化使用System.Threading.Timer的现有代码,并将其替换为基于RX的解决方案。我缺少的是如何将WCF服务方法类转换为可观察的。我考虑了以下几点:
- 在作为Subscribe参数传递的WCF服务和缓存观察器中实现IObservable可能不是一个好主意,代码也很难看
-
像这样使用主题:
this works, but probably using subjects in production code is not a good idea.Subject s1 = new Subject(); s1.Timeout(TimeSpan.FromSeconds(3)) .Subscribe(_ => { }, ex => Console.WriteLine("Timeout at " + DateTime.Now));
Console.WriteLine("Hertbeat at: " + DateTime.Now); s1.OnNext(Unit.Default); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Hertbeat at: " + DateTime.Now); s1.OnNext(Unit.Default);
- Use events in service's implementation:
class HeartbeatService { public void Heartbeat() { OnHeartbeatArrived(); }
public event Action HeartbeatArrived;
protected virtual void OnHeartbeatArrived() { Action handler = HeartbeatArrived; if (handler != null) handler(Unit.Default); } }
and then use RX
HeartbeatService heartbeatService = new HeartbeatService();
Observable.FromEvent<Unit>(action => heartbeatService.HeartbeatArrived += action,
action => heartbeatService.HeartbeatArrived -= action)
.Timeout(TimeSpan.FromSeconds(3))
.Subscribe(_ => { },
ex => Console.WriteLine("Timeout " + ex + ", at " + DateTime.Now));
Console.WriteLine("Heartbeat at: " + DateTime.Now);
heartbeatService.Heartbeat();
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("Heartbeat at: " + DateTime.Now);
heartbeatService.Heartbeat();
Console.WriteLine("Should timeout now");
我也不喜欢这个解决方案,WCF服务中的事件似乎是多余的。在RX中有没有一个好的方法可以做到这一点,或者我可能试图在这里滥用RX?
Throttle
运算符是Rx中心跳算法的关键部分。基本思想是建立一个包含警报的流,只要心跳到达,警报就会被抑制。更多详细信息,请参阅我的博客文章:http://www.zerobugbuild.com/?p=230