在下面的代码中,我正在使用Replay()
来避免使用stuff [...]
时的价格更新损失。stuff[...]
完成后,我需要发送这些价格更新。
var observable = Observable.FromEventPattern<Price>(h => book.PriceUpdated += h, h => book.PriceUpdated -= h)
.Replay();
observable.Connect();
// Do some stuff [...]
observable.Select(p => Observable.FromAsync(() => SendPriceUpdateAsync(p.Sender, p.EventArgs, socket)))
.Concat()
.Subscribe();
我想在完成stuff [...]
和发送价格后删除Replay()
(我不希望存储所有值,并且一旦发送它们,我就不再需要这些值(。
有没有简单的方法?
因此,您实际想要的是延迟所有事件,直到完成做一些事情。我建议完全不使用重播。
而是尝试这样的方法:
var observable = eventObservable
.TakeUntil(Observable.Start(() => { /* Do some stuff [...] */ })) //collect only events while processing, completes the observable when done processing
.ToList() //with ToList only propagate all at once on completion
.SelectMany(list => list.ToObservable()) //unfold the list to single events again
.Concat(eventObservable); //continue with normal events observable
,所以我找不到与操作员一起做的任何方法,但是我们总是可以创建自己的可连接可观察类!:(
public class UntilSubscribeReplay<T> : IObservable<T>
{
private readonly IObservable<T> _parent;
private HashSet<IObserver<T>> _observers = new HashSe
private IDisposable _subscription;
private readonly Queue<T> _itemsQueue = new Queue<T>(
public UntilSubscribeReplay(IObservable<T> parent)
{
_parent = parent;
Subscribe();
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (_subscription == null)
Subscribe();
_observers.Add(observer);
EmitQueuedItems();
return Disposable.Create(() =>
{
_observers.Remove(observer);
if (_observers.Count == 0)
_subscription.Dispose();
});
}
private void Subscribe()
{
_subscription = _parent
.Do(_itemsQueue.Enqueue)
.Subscribe(_ => EmitQueuedItems());
}
private void EmitQueuedItems()
{
if (_observers.Count == 0)
return;
while (_itemsQueue.TryDequeue(out T item))
{
foreach (IObserver<T> observer in _observers)
{
observer.OnNext(item);
}
}
}
}
当然,为您的方便起见,一种扩展方法。:(
public static class MyObservableExtensions
{
public static IObservable<T> UntilSubscribeReplay<T>(this IObservable<T> source)
=> new UntilSubscribeReplay<T>(source);
}
这就是您的使用方式:
IObservable<int> myReplay = myObservable.UntilSubscribeReplay();
// Do Stuff
myReplay.Subscribe(value => HandleValue(value));
请注意,无需使用连接。适当的方法是将其作为可连接的可观察到(避免使用扩展时避免内存泄漏,但最终不订阅(,但是对于您的具体情况,它应该足够了。
让我知道情况如何。:(