.NET反应性扩展:删除重播



在下面的代码中,我正在使用Replay()来避免使用stuff [...]时的价格更新损失。stuff[...]完成后,我需要发送这些价格更新。

var observable = Observable.FromEventPattern<Price>(h => book.PriceUpdated += h, h => book.PriceUpdated -= h)
                           .Replay();
observable.Connect();
// Do some stuff [...]
observable.Select(p => Observable.FromAsync(() => SendPriceUpdateAsync(p.Sender, p.EventArgs, socket)))
                      .Concat()
                      .Subscribe();

我想在完成stuff [...]和发送价格后删除Replay()(我不希望存储所有值,并且一旦发送它们,我就不再需要这些值(。

有没有简单的方法?

因此,您实际想要的是延迟所有事件,直到完成做一些事情。我建议完全不使用重播。

而是尝试这样的方法:

var observable = eventObservable
.TakeUntil(Observable.Start(() => { /* Do some stuff [...] */ })) //collect only events while processing, completes the observable when done processing
.ToList() //with ToList only propagate all at once on completion
.SelectMany(list => list.ToObservable()) //unfold the list to single events again
.Concat(eventObservable); //continue with normal events observable

,所以我找不到与操作员一起做的任何方法,但是我们总是可以创建自己的可连接可观察类!:(

public class UntilSubscribeReplay<T> : IObservable<T>
{
    private readonly IObservable<T> _parent;
    private HashSet<IObserver<T>> _observers = new HashSe
    private IDisposable _subscription;
    private readonly Queue<T> _itemsQueue = new Queue<T>(
    public UntilSubscribeReplay(IObservable<T> parent)
    {
        _parent = parent;
        Subscribe();
    }
    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (_subscription == null)
            Subscribe();
        _observers.Add(observer);
        EmitQueuedItems();
        return Disposable.Create(() =>
        {
            _observers.Remove(observer);
            if (_observers.Count == 0)
                _subscription.Dispose();
        });
    }
    private void Subscribe()
    {
        _subscription = _parent
            .Do(_itemsQueue.Enqueue)
            .Subscribe(_ => EmitQueuedItems());
    }
    private void EmitQueuedItems()
    {
        if (_observers.Count == 0)
            return;
        while (_itemsQueue.TryDequeue(out T item))
        {
            foreach (IObserver<T> observer in _observers)
            {
                observer.OnNext(item);
            }
        }
    }
}

当然,为您的方便起见,一种扩展方法。:(

public static class MyObservableExtensions
{
    public static IObservable<T> UntilSubscribeReplay<T>(this IObservable<T> source) 
        => new UntilSubscribeReplay<T>(source);
}

这就是您的使用方式:

IObservable<int> myReplay = myObservable.UntilSubscribeReplay();
// Do Stuff
myReplay.Subscribe(value => HandleValue(value));

请注意,无需使用连接。适当的方法是将其作为可连接的可观察到(避免使用扩展时避免内存泄漏,但最终不订阅(,但是对于您的具体情况,它应该足够了。

让我知道情况如何。:(

相关内容

  • 没有找到相关文章

最新更新