"Neverending" TakeWhile、BufferWhile 和 SkipWhile RX.Net 序列



我想知道是否有一种方法可以获取可观察流并使用*While运算符,特别是TakeWhile、SkipWhile和BufferWhile,这样当布尔'While'条件满时,它们的订阅者就不会收到.OnComplete?

当我开始使用.TakeWhile/SkipWhile和BufferWhile运算符时,我认为它们不会终止/.OnComplete(),而只是(不)在满足布尔条件时发出。

举一个例子可能更有意义:

我有一个bool标志,指示实例是否繁忙,以及一个可观察的数据流:

private bool IsBusy { get;set; }
private bool IgnoreChanges { get;set; }
private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }

并使用/设置类似于(简化的)的RX流

private void SetupRx()
{
    ConsumerSubscription = Producer
        .SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false
        .BufferWhile(_ => IsBusy == true) // for all streamed instances buffer them as long as we are busy handling the previous one(s)
        .Subscribe(i => DoSomething(i));
}
private void DoSomething(int i)
{
    try
    {
        IsBusy = true;
        // ... do something
    }
    finally
    {
        IsBusy = false;
    }
}

每当IsBusy/IgnoreChanges标志从true切换到false并返回时,.SkipeWhile/.BufferWhile不应完成/OnComplete(..),但应保持流的活动状态。

在RX.Net开箱即用的情况下,这在某种程度上可行吗?和/或有人知道如何实现这一点吗?

要从IObservable<T>源删除OnCompleted消息,只需使用Observable.Never<T>():的Concat

source.TakeWhile(condition).Concat(Observable.Never<T>())

要手动订阅IObservable<T>源,使订阅仅在手动取消订阅时结束,可以使用PublishIConnectableObservable<T>:

var connectableSource = source.Publish();
// To subscribe to the source:
var subscription = connectableSource.Connect();
...
// To unsubscribe from the source:
subscription.Dispose();

尽管如此,我认为你处理这个问题的方式是错误的。如果做对了,你就不需要上面的技巧了。查看您的查询:

ConsumerSubscription = Producer
    // Drop the producer's stream of ints whenever the IgnoreChanges flag
    // is set to true, but forward them whenever the IgnoreChanges flag is set to false
    .SkipWhile(_ => IgnoreChanges == true) 
    // For all streamed instances buffer them as long as we are busy
    // handling the previous one(s)
    .BufferWhile(_ => IsBusy == true) 
    .Subscribe(i => DoSomething(i));

您应该使用.Where(_ => !IgnoreChanges)而不是.SkipWhile(_ => IgnoreChanges)

您应该将.Buffer(_ => IsBusy.SkipWhile(busy => busy))BehaviorSubject<bool> IsBusy一起使用,而不是使用.BufferWhile(_ => IsBusy)

完整的代码如下所示:

private BehaviorSubject<bool> IsBusy { get;set; }
private bool IgnoreChanges { get;set; }
private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
private void SetupRx()
{
    ConsumerSubscription = Producer
        .Where(_ => !IgnoreChanges)
        .Buffer(_ => IsBusy.SkipWhile(busy => busy))
        .Subscribe(buffer => DoSomething(buffer));
}
private void DoSomething(IList<int> buffer)
{
    try
    {
        IsBusy.OnNext(true);
        // Do something
    }
    finally
    {
        IsBusy.OnNext(false);
    }
}

下一步的改进将是尝试摆脱BehaviorSubject<bool> IsBusy。主题是你想要尽量避免的,因为它们是你必须管理的状态。

相关内容

  • 没有找到相关文章

最新更新