我想:
- 立即订阅
IObservable<T>
,但立即开始缓冲收到的任何T
(即我的IObserver<T>
尚未看到)。 - 当工作完成时,将缓冲区刷新到我的
IObserver<T>
并继续
订阅是第一件很重要的事。
在一个"大理石图"的形式,我之后的东西像这样…
Time T+1 2 3 4 5 6 7 8
s1:IObservable<int> 1 2 3 4 5 6 7 8
s2:IObservable<bool> t
r: IObservable<int> 1 3 4 5 6 7 8
2
…在T+1,我订阅一个IObservable<bool>
r
,它本身依赖于IObservable<int>
s1
和IObservable<bool>
s2
。s1
是我不控制的流,s2
是我控制的流(主题),publish
是工作完成时的流。
我认为SkipUntil
会帮助我,但这并不能缓冲在依赖的IObservable
完成之前收到的事件。
这是一些我认为可以工作的代码,但由于SkipUntil
不是缓冲区而不能。
var are = new AutoResetEvent(false);
var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));
events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());
var subject = new Subject<int>();
var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("Subscribing to events...");
events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
Console.WriteLine("Subscribed.");
completed.Subscribe(x => Console.WriteLine("Completed"));
subject.OnNext(10);
are.WaitOne();
Console.WriteLine("Done");
我知道各种Buffer
方法,但它们在这种情况下似乎不合适,因为我在这里没有真正缓冲,只是在订阅开始时协调活动。
我将Enigmativity的响应概括为以下可能有用的扩展方法:
public static class ObservableEx
{
public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
{
var observable = Observable.Create<TSource>(o =>
{
var replaySubject = new ReplaySubject<TSource>();
var sub1 = source.Subscribe(replaySubject);
var query =
completed.Take(1).Select(
x => replaySubject.AsObservable());
var sub2 = query.Switch().Subscribe(o);
return new CompositeDisposable(sub1, sub2);
});
return observable;
}
}
这个适合我:
var r = Observable.Create<int>(o =>
{
var rs = new ReplaySubject<int>();
var subscription1 = s1.Subscribe(rs);
var query = from f in s2.Take(1) select rs.AsObservable();
var subscription2 = query.Switch().Subscribe(o);
return new CompositeDisposable(subscription1, subscription2);
});