如何创建一个Rx可观察对象,它在最后一个观察者退订时停止发布事件



我将创建一个可观察对象(通过各种方式)并将其返回给感兴趣的各方,但是当他们完成侦听后,我想删除该可观察对象,以便它不再继续消耗资源。另一种理解方式是在发布子系统中创建主题。当不再有人订阅某个主题时,您就不想再保留该主题及其过滤了。

Rx已经有一个操作符来满足您的需求-实际上是两个- Publish &RefCount .

下面是如何使用它们:

IObservable xs = ...
var rxs = xs.Publish().RefCount();
var sub1 = rxs.Subscribe(x => { });
var sub2 = rxs.Subscribe(x => { });
//later
sub1.Dispose();
//later 
sub2.Dispose();
//The underlying subscription to `xs` is now disposed of.

简单。

如果我理解了你的问题,你想要创建一个可观察对象,这样当所有订阅者都处理了他们的订阅,即没有更多的订阅者,然后你想要执行一个清理函数,这将阻止可观察对象产生更多的值。如果这是你想要的,那么你可以这样做:

//Wrap a disposable
public class WrapDisposable : IDisposable
    {
        IDisposable disp;
        Action act;
        public WrapDisposable(IDisposable _disp, Action _act)
        {
            disp = _disp;
            act = _act;
        }
        void IDisposable.Dispose()
        {
            act();
            disp.Dispose();
        }
    }
    //Observable that we want to clean up after all subs are done
    public static IObservable<long> GenerateObs(out Action cleanup)
    {
        cleanup = () =>
        {
            Console.WriteLine("All subscribers are done. Do clean up");
        };
        return Observable.Interval(TimeSpan.FromSeconds(1));
    }
    //Wrap the observable
    public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone)
    {
        int count = 0;
        return Observable.CreateWithDisposable<T>(ob =>
        {
            var disp = obs.Subscribe(ob);
            Interlocked.Increment(ref count);
            return new WrapDisposable(disp,() =>
            {
                if (Interlocked.Decrement(ref count) == 0)
                {
                    onAllDone();                                                
                }
            });
        });
    }

//用法的例子:

Action cleanup;
var obs = GenerateObs(out cleanup);
var newObs = WrapToClean(obs, cleanup);
newObs.Take(6).Subscribe(Console.WriteLine);
newObs.Take(5).Subscribe(Console.WriteLine);

相关内容

  • 没有找到相关文章

最新更新