使用RX编写命令总线



我对RX越来越熟悉,作为我的实验项目,我正在尝试创建一个概念上类似于以下的简单命令总线:

class Bus
{
    Subject<Command> commands;
    IObservable<Invocation> invocations;
    public Bus()
    {
        this.commands = new Subject<Command>();
        this.invocations = commands.Select(x => new Invocation { Command = x }).Publish();
    }
    public IObserver<Command> Commands
    {
        get { return this.commands; }
    }
    public IObservable<Invocation> Invocations
    {
        get { return this.invocations; }
    }
}
class Invocation
{
    public Command Command { get; set; }
    public bool Handled { get; set; }
}

其思想是,模块可以在启动时使用Invocations属性安装命令处理程序,并可以对其订阅应用他们希望的任何筛选。另一方面,客户端可以通过调用Commands.OnNext(command)来触发命令执行。

但是,我希望总线能够保证提交的每个命令都将由一个处理程序来处理。也就是说,理想情况下,OnNext处理应在第一个处理程序设置Invocation后立即终止。如果在OnNext()结束时,Invocation.Handled仍然为false,则Handled为true AND应抛出异常。

我试着创建自己的ISubject、IObservable和IObserver实现,但这感觉"又脏又便宜";)

我正在努力让我的头脑围绕RX提供的合成能力。以一种组合的方式,我如何提供"精确一次"的保证?

感谢您提供的任何见解。

实际上,您通常都有正确的想法。你只需要做实际的调度。为此,SelectMany将有所帮助:

class Bus
{
    Subject<Command> commands;
    Subject<Invocation> invocations;
    // TODO: Instantiate me
    List<Func<Command, bool>> handlerList; 
    public Bus()
    {
        this.commands = new Subject<Command>();
        this.invocations = new Subject<Invocation>();
        commands.SelectMany(x => {
            // This FirstOrDefault() is just good ol' LINQ
            var passedHandler = 
                handlerList.FirstOrDefault(handler => handler(x) == true);
            return passedHandler != null ?
                Observable.Return(new Invocation() { Command = x, Handled = true}) :
                Observable.Throw<Invocation>(new Exception("Unhandled!"));
        }).Multicast(invocations).Connect();
    }
    /* ... snip ... */
}

但是,老实说,这并不能真正展示Rx的强大功能,因为它正在同步执行处理程序列表。让我们通过使其完全无阻塞来使其更加引人注目。

首先,我们将把Func原型更改为Func<Command, IObservable<Invocation>>。这意味着,一个方法接受一个命令并产生一个Future Invocation结果(a-la Task<T>)。然后,我们可以通过这个选择器获得相同的行为,但我们的处理程序是异步的(通过TextArea提前编码):

commands.SelectMany(x =>
    handlerList.ToObservable()
        .Select(h => Observable.Defer(() => h(x)))
        .Concat()
        .SkipWhile(x => x.Handled == false)
        .TakeLast(1))
    .Multicast(invocations).Connect();

这是Rx的一个相当高级的使用,但我们的想法是,对于每个命令,我们将首先创建一个处理程序流,并按顺序运行它们(这就是Defer+Concat所做的),直到我们找到一个Handled为true的处理程序,然后取最后一个。

外部SelectMany将一个命令流选择为一个未来结果流(即类型为IO<IO<Invocation>>,然后将其平坦化,因此它成为一个结果流。

从来没有阻塞,非常简洁,100%可测试,类型安全的代码,只是表达了一个非常复杂的想法,如果强行编写的话,会非常难看。这就是为什么Rx很酷。

虽然你可以制作一个"只需一次"的主题,但你不应该。接口(以及库中的所有运算符)意味着将通知所有观察者(忽略OnNext调用中出现异常的可能性)。

你可以做的是创建一组替代的接口来定义你想要的语义:

interface IHandlableObservable<T>
{
    //gets first chance at the notification
    IDisposable SubscribeFirst(IHandlingObserver<T> observer);
    //gets last chance at the notification
    IDisposable SubscribeLast(IHandlingObserver<T> observer);
    //starts the notification (possibly subscribing to an underlying IObservable)
    IDisposable Connect();
}
interface IHandlingObserver<T>
{
    //return indicates if the observer "handled" the value
    bool OnNext(T value);
    void OnError(Exception ex);
    void OnCompleted();
}

然后,您可以定义允许将常规可观测值转换为可处理可观测值的方法,以便将大部分逻辑保留在标准RX运算符中。

最新更新