我在TCP上下文中工作,我需要对传入数据做出反应:如果接收到太多数据,我需要忽略除最后接收到的所有数据;如果很长一段时间没有数据进来,我必须向服务器发送请求。
第一个命题是这样求解的:
Observable.FromEventPattern<ObjectReceivedEventArgs>( _client, "ObjectReceived" )
.Throttle( TimeSpan.FromMilliseconds( 500 ) )
.Subscribe( args => ... );
第二个命题用Timer来解决:
Observable.Timer( ... ).Subscribe( ... );
现在,我可以将这两件事混合在一起,这样如果没有数据传入,我就可以及时地向服务器发送请求。
是的,您可以像这样对源事件进行二次订阅:
Observable.FromEventPattern<ObjectReceivedEventArgs>(_client, "ObjectReceived")
.Select(_ => Unit.Default)
.StartWith(Unit.Default)
.Throttle(TimeSpan.FromSeconds(/*desired timeout here */))
.Take(1).Repeat()
.Subscribe(_ => /* poke server here */);
这所做的是用一个Unit值启动一个流,并试图通过事件的到来来限制它。只要事件在超时时间内到达,流就会被节流抑制,只要事件在时间范围内暂停,那么OnNext
就会被触发。请注意,即使没有进一步的事件到达,Take(1)
也会在每次后续暂停后导致超时持续触发—如果您只想在事件停止后调用服务器一次,只需删除它即可。