我想解决以下问题:a)订阅者从IObservable接收事件一段时间。然后退订,做一些事情,然后再订阅。这里,它应该从执行退订的同一点开始接收事件。b)这种行为对于多订阅者模型是可取的。例如,当一个人取消订阅时,其他人应该继续接收事件。
RX方面有什么建议吗?
提前感谢!
这里有一个相当简单的Rx方法来做你想做的事情,这是从我对另一个问题的回答中复制的。我创建了一个名为Pausable
的扩展方法,它接受一个源可观察对象和一个布尔值的第二个可观察对象,用于暂停或恢复可观察对象。
public static IObservable<T> Pausable<T>(
this IObservable<T> source,
IObservable<bool> pauser)
{
return Observable.Create<T>(o =>
{
var paused = new SerialDisposable();
var subscription = Observable.Publish(source, ps =>
{
var values = new ReplaySubject<T>();
Func<bool, IObservable<T>> switcher = b =>
{
if (b)
{
values.Dispose();
values = new ReplaySubject<T>();
paused.Disposable = ps.Subscribe(values);
return Observable.Empty<T>();
}
else
{
return values.Concat(ps);
}
};
return pauser.StartWith(false).DistinctUntilChanged()
.Select(p => switcher(p))
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, paused);
});
}
可以这样使用:
var xs = Observable.Generate(
0,
x => x < 100,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(0.1));
var bs = new Subject<bool>();
var pxs = xs.Pausable(bs);
pxs.Subscribe(x => { /* Do stuff */ });
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
听起来您需要一个"可暂停"的流。假设一次只有一个订阅者处理这些值(而其他订阅者只是等待),这种解决方案可能就是您所需要的。