我想知道是否有一种方法可以获取可观察流并使用*While运算符,特别是TakeWhile、SkipWhile和BufferWhile,这样当布尔'While'条件满时,它们的订阅者就不会收到.OnComplete?
当我开始使用.TakeWhile/SkipWhile和BufferWhile运算符时,我认为它们不会终止/.OnComplete(),而只是(不)在满足布尔条件时发出。
举一个例子可能更有意义:
我有一个bool标志,指示实例是否繁忙,以及一个可观察的数据流:
private bool IsBusy { get;set; }
private bool IgnoreChanges { get;set; }
private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
并使用/设置类似于(简化的)的RX流
private void SetupRx()
{
ConsumerSubscription = Producer
.SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false
.BufferWhile(_ => IsBusy == true) // for all streamed instances buffer them as long as we are busy handling the previous one(s)
.Subscribe(i => DoSomething(i));
}
private void DoSomething(int i)
{
try
{
IsBusy = true;
// ... do something
}
finally
{
IsBusy = false;
}
}
每当IsBusy/IgnoreChanges标志从true切换到false并返回时,.SkipeWhile/.BufferWhile不应完成/OnComplete(..),但应保持流的活动状态。
在RX.Net开箱即用的情况下,这在某种程度上可行吗?和/或有人知道如何实现这一点吗?
要从IObservable<T>
源删除OnCompleted
消息,只需使用Observable.Never<T>()
:的Concat
source.TakeWhile(condition).Concat(Observable.Never<T>())
要手动订阅IObservable<T>
源,使订阅仅在手动取消订阅时结束,可以使用Publish
和IConnectableObservable<T>
:
var connectableSource = source.Publish();
// To subscribe to the source:
var subscription = connectableSource.Connect();
...
// To unsubscribe from the source:
subscription.Dispose();
尽管如此,我认为你处理这个问题的方式是错误的。如果做对了,你就不需要上面的技巧了。查看您的查询:
ConsumerSubscription = Producer
// Drop the producer's stream of ints whenever the IgnoreChanges flag
// is set to true, but forward them whenever the IgnoreChanges flag is set to false
.SkipWhile(_ => IgnoreChanges == true)
// For all streamed instances buffer them as long as we are busy
// handling the previous one(s)
.BufferWhile(_ => IsBusy == true)
.Subscribe(i => DoSomething(i));
您应该使用.Where(_ => !IgnoreChanges)
而不是.SkipWhile(_ => IgnoreChanges)
。
您应该将.Buffer(_ => IsBusy.SkipWhile(busy => busy))
与BehaviorSubject<bool> IsBusy
一起使用,而不是使用.BufferWhile(_ => IsBusy)
。
完整的代码如下所示:
private BehaviorSubject<bool> IsBusy { get;set; }
private bool IgnoreChanges { get;set; }
private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
private void SetupRx()
{
ConsumerSubscription = Producer
.Where(_ => !IgnoreChanges)
.Buffer(_ => IsBusy.SkipWhile(busy => busy))
.Subscribe(buffer => DoSomething(buffer));
}
private void DoSomething(IList<int> buffer)
{
try
{
IsBusy.OnNext(true);
// Do something
}
finally
{
IsBusy.OnNext(false);
}
}
下一步的改进将是尝试摆脱BehaviorSubject<bool> IsBusy
。主题是你想要尽量避免的,因为它们是你必须管理的状态。