Rx GroupBy -如何获取分组值



基于React 2014上的一个"一切都是一个流"的演讲,并查看了Reactive Trader的源代码,我想我应该尝试重新编写一些旧代码来遵循这个模式,但我有点困惑。

我有两个签名如下的方法:
public static IObservable<OrderDto> GetOrderStream(string name)
public static IObservable<PriceDto> GetPriceStream(string exchange, string security)

两种方法都使用Observable.Create来封装一些事件,并使用Publish()RefCount()来创建Observable。

每个OrderDto包含用于交换和安全的字段。我想按Exchange和Security对订单进行分组,这样我就可以从单独的流请求定价信息。对于我的最终结果,我想打印每个订单以及订单中交易所/证券的当前价格。

对于订单,我有以下命令:

var orders = Observable.Defer(() => GetOrderStream("FNZCTEST"))
                                                .GroupBy(o => new { o.Exchange, o.Security })
                                                .Publish()
                                                .RefCount();

如果我使用:

var j = from order in orders
                from o in order
                from price in GetPriceStream(order.Key.Exchange, order.Key.Security).Materialize()
                select new { Order = o, Price = price };
IDisposable disposable = j.Subscribe(x => Console.WriteLine("{0} : {1}", x.Order, x.Price.HasValue ? x.Price.Value : new PriceDto()));

我得到了所需的输出,但是对于相同的Exchange/Security(即不是每个组一次)反复调用GetPriceStream

如果改成

var j = from order in orders
        from price in GetPriceStream(order.Key.Exchange, order.Key.Security).Materialize()
        select new { Price = price };
IDisposable disposable = j.Subscribe(x => Console.WriteLine("{0} : {1}", "", x.Price.HasValue ? x.Price.Value : new PriceDto()));

GetPriceStream按我所期望的为每个组调用一次。我的问题是-我如何获得这种期望的行为,并获得访问组中的每个OrderDto,以便我可以一起输出订单和价格。

这里发生的一些事情对我来说没有意义。我不明白你为什么要做.Defer(...),为什么要做.Publish().RefCount()

同样,你正在分组你的结果,然后使它们平坦化。为什么不让他们保持原样呢?

最后,看起来您正在使用.Materialize()以某种方式处理GetPriceStream不产生实际值的情况。

那么,考虑到所有这些,这似乎是你最可能需要的查询:

var query =
    from order in GetOrderStream("FNZCTEST")
    from price in GetPriceStream(order.Exchange, order.Security)
        .DefaultIfEmpty(new PriceDto())
    select new
    {
        Order = order,
        Price = price,
    };
IDisposable disposable =
    query.Subscribe(x => Console.WriteLine("{0} : {1}", x.Order, x.Price));

现在,也有可能你的Observable.Create的实现也给你带来了痛苦。所以,如果你可以的话如果你能在你的问题后面加上实现就太好了

相关内容

  • 没有找到相关文章

最新更新