如何订阅一个IObservable,而不是缓存数据,直到另一个IObservable发布



我想:

  1. 立即订阅IObservable<T>,但立即开始缓冲收到的任何T(即我的IObserver<T>尚未看到)。
  2. 当工作完成时,将缓冲区刷新到我的IObserver<T>并继续

订阅是第一件很重要的事。

在一个"大理石图"的形式,我之后的东西像这样…

Time                  T+1   2   3   4   5   6   7   8
s1:IObservable<int>     1   2   3   4   5   6   7   8  
s2:IObservable<bool>          t    
r: IObservable<int>           1 3   4   5   6   7   8
                              2

…在T+1,我订阅一个IObservable<bool> r,它本身依赖于IObservable<int> s1IObservable<bool> s2s1是我不控制的流,s2是我控制的流(主题),publish是工作完成时的流。

我认为SkipUntil会帮助我,但这并不能缓冲在依赖的IObservable完成之前收到的事件。

这是一些我认为可以工作的代码,但由于SkipUntil不是缓冲区而不能。

        var are = new AutoResetEvent(false);
        var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));
        events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());
        var subject = new Subject<int>();
        var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));
        Console.WriteLine("Subscribing to events...");
        events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
        Console.WriteLine("Subscribed.");
        completed.Subscribe(x => Console.WriteLine("Completed"));
        subject.OnNext(10);
        are.WaitOne();
        Console.WriteLine("Done");

我知道各种Buffer方法,但它们在这种情况下似乎不合适,因为我在这里没有真正缓冲,只是在订阅开始时协调活动。

我将Enigmativity的响应概括为以下可能有用的扩展方法:

public static class ObservableEx
{
    public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
    {
        var observable = Observable.Create<TSource>(o =>
        {
            var replaySubject = new ReplaySubject<TSource>();
            var sub1 = source.Subscribe(replaySubject);
            var query =
                completed.Take(1).Select(
                    x => replaySubject.AsObservable());
            var sub2 = query.Switch().Subscribe(o);
            return new CompositeDisposable(sub1, sub2);
        });
        return observable;
    }        
}

这个适合我:

var r = Observable.Create<int>(o =>
{
    var rs = new ReplaySubject<int>();
    var subscription1 = s1.Subscribe(rs);
    var query = from f in s2.Take(1) select rs.AsObservable();
    var subscription2 = query.Switch().Subscribe(o);
    return new CompositeDisposable(subscription1, subscription2);
});

相关内容

  • 没有找到相关文章

最新更新