向IObserver应用订阅者策略—当订阅者不需要时,为Subscribe()返回什么



考虑具有以下Subscribe实现的IObservable<T>:

public IDisposable Subscribe(IObserver<T> observer)
{
    if (observer == null)
    {
        throw new ArgumentNullException("observer");
    }
    lock (_subscriberSync)
    {
        var accepted = OnSubscribing(observer);   // <------ policy can be applied here
        if (!accepted)
        {
            /* #1 */ return null;
            /* #2 */ // return NullSubscription.Instance;
            /* #3 */ // throw new SubscriptionRejectedException("reason");
        }
        // ... perform actual subscription went here
    }
}

似乎没有任何关于如何建立拒绝订阅的指导。理想情况下,我们有一个Boolean TrySubscribe(IObserver<T> observer, out IDisposable subscription)Maybe<IDisposable> Subscribe(IObserver<T> observer)来表示条件,但似乎我们只有一个标志值或带外异常的选项。每个都有缺点:

对于#1,似乎我遇到的所有代码都不检查null,甚至Resharper静态分析也在其上添加了NotNull属性。

对于#2,我必须测试NullSubscription,它与#1没有太大的不同,除了它不是很容易发现(典型的"魔术"返回值)。

对于#3,我们有用于特殊情况的典型流程,但对于延迟订阅,它使调试复杂化。

除了你已经实现的这三个方法之外,还有其他的方法吗?

observer.onError(new SomeException())怎么了

而且,看起来您正在手动实现IObservable<T>,这是不建议的。如果你必须这样做,你可以这样做:

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (_subscriberSync)
        {
            var accepted = OnSubscribing(observer);   // <------ policy can be applied here
            if (!accepted)
            {
                observer.OnError(new SubscriptionRejectedException("reason"));
                return Disposable.Empty;
            }
            // ... perform actual subscription went here
        }
    }

我假设你有一个很好的理由来实现IObservable<T>,所以我们将忽略它-所以如果我正确地阅读了你的问题,你想知道在"订阅失败"的情况下返回什么,它与链接的Rx查询逻辑的其余部分很好地融合在一起……嗯……嗯,我可能会使用Disposable.Create方法:

public class MyObservable<T> : IObservable<T>
{
    private static object _subscriberSync = new object();
    private IObservable<T> _source;
    public MyObservable(IObservable<T> source)
    {
        _source = source;
    }
    protected virtual bool OnSubscribing(IObserver<T> obs)
    {
        return false;
    }
    public void DohSubscriptionFailed()
    {
        // Whatever the heck you want to do here?
        Console.WriteLine("Sorry, you never got a subscription");
    }
    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (observer == null)
        {
            throw new ArgumentNullException("observer");
        }
        lock (_subscriberSync)
        {
            var accepted = OnSubscribing(observer);   // <------ policy can be applied here
            if (!accepted)
            {
                return Disposable.Create(() => DohSubscriptionFailed());
            }
            // ... perform actual subscription went here
            return _source.Subscribe(observer);
        }
    }
}

使用和输出:

void Main()
{
    // A faked up source
    var source = new Subject<bool>();
    var query = new MyObservable<bool>(source);
    using(query.Subscribe(Console.WriteLine))
    {
        // nothing on output...
        source.OnNext(true);
        // still nothing on output...
        source.OnNext(false);
        Console.ReadLine();
    }
    // dispose fires, outputs "Sorry, you never got a subscription"
}

现在,正如这里所写的,DohSubscriptionFailed没有做太多有用的事情,但是根据您的整个用例,您可以在那里触发任何您想要的东西。

最新更新