如何判断 Subject.OfType<T> 是否有任何订阅者



,所以我试图使用rx.net进行一些小消息总线。

public class Bus {
    private readonly Subject<BaseCommand> _commands = new Subject<BaseComand>();
    public void RegisterHandler<TCommand>(Action<TCommand> handler) where TCommand: BaseCommand {
        _commands
            .OfType&lt;TCommand&gt;()
            .Publish()
            .RefCount()
            .Subscribe(handler);
    }
    public void SendCommand<TCommand>(TCommand command) where TCommand: BaseCommand {
        _commands.OnNext(command);
    }
}

所以这是代码的要旨。我想限制订阅,以便只有一个消息类型可以存在一个订阅。无论如何,是否可以在添加新订阅之前从OfType<T>检查可观察的任何现有订阅?

我会建议这样的事情(我已经更改了您的registerHandler以具有可iDisposable的返回类型,因此您实际上可以再次取消订阅(:

public class Bus
{
    private readonly Subject<BaseCommand> _commands = new Subject<BaseCommand>();
    private class Counter<TCommand> where TCommand : BaseCommand
    {
        public static int Count;
    }
    public IDisposable RegisterHandler<TCommand>(Action<TCommand> handler, Action<Exception> OnError = null) where TCommand : BaseCommand
    {
        OnError = OnError ?? (Action<Exception>)((ex) => Dispatcher.CurrentDispatcher.Invoke(() => {throw ex; })); // alternative case of course only works if dispatcher is available
        return Observable.Create<TCommand>(o =>
        {
            if (Interlocked.Increment(ref Counter<TCommand>.Count) > 1)
            {
                Interlocked.Decrement(ref Counter<TCommand>.Count);
                o.OnError(new InvalidOperationException("Too many subscribers!"));
                return Disposable.Empty;
            }
            var subscription = _commands
                .OfType<TCommand>()
                      .Publish()
                      .RefCount()
                      .Subscribe(o);
            var decrement = Disposable.Create(() =>
            {
                Interlocked.Decrement(ref Counter<TCommand>.Count);
            });
            return new CompositeDisposable(subscription, decrement);
        })
        .Subscribe(handler, OnError);
    }
    public void SendCommand<TCommand>(TCommand command) where TCommand : BaseCommand
    {
        _commands.OnNext(command);
    }
}

编辑:我可能会将您的寄存器功能的签名更改为

 public IObservable<TCommand> RegisterHandler<TCommand>() where TCommand : BaseCommand

不过;通过错误管理节省一些麻烦(订户必须亲自照顾这一点(,您的消费者在何时以及如何订阅这些事件的方式上更加自由。

相关内容

  • 没有找到相关文章

最新更新