基于React 2014上的一个"一切都是一个流"的演讲,并查看了Reactive Trader的源代码,我想我应该尝试重新编写一些旧代码来遵循这个模式,但我有点困惑。
我有两个签名如下的方法:public static IObservable<OrderDto> GetOrderStream(string name)
public static IObservable<PriceDto> GetPriceStream(string exchange, string security)
两种方法都使用Observable.Create
来封装一些事件,并使用Publish()
和RefCount()
来创建Observable。
每个OrderDto包含用于交换和安全的字段。我想按Exchange和Security对订单进行分组,这样我就可以从单独的流请求定价信息。对于我的最终结果,我想打印每个订单以及订单中交易所/证券的当前价格。
对于订单,我有以下命令:
var orders = Observable.Defer(() => GetOrderStream("FNZCTEST"))
.GroupBy(o => new { o.Exchange, o.Security })
.Publish()
.RefCount();
如果我使用:
var j = from order in orders
from o in order
from price in GetPriceStream(order.Key.Exchange, order.Key.Security).Materialize()
select new { Order = o, Price = price };
IDisposable disposable = j.Subscribe(x => Console.WriteLine("{0} : {1}", x.Order, x.Price.HasValue ? x.Price.Value : new PriceDto()));
我得到了所需的输出,但是对于相同的Exchange/Security(即不是每个组一次)反复调用GetPriceStream
。
如果改成
var j = from order in orders
from price in GetPriceStream(order.Key.Exchange, order.Key.Security).Materialize()
select new { Price = price };
IDisposable disposable = j.Subscribe(x => Console.WriteLine("{0} : {1}", "", x.Price.HasValue ? x.Price.Value : new PriceDto()));
则GetPriceStream
按我所期望的为每个组调用一次。我的问题是-我如何获得这种期望的行为,并获得访问组中的每个OrderDto,以便我可以一起输出订单和价格。
这里发生的一些事情对我来说没有意义。我不明白你为什么要做.Defer(...)
,为什么要做.Publish().RefCount()
。
同样,你正在分组你的结果,然后使它们平坦化。为什么不让他们保持原样呢?
最后,看起来您正在使用.Materialize()
以某种方式处理GetPriceStream
不产生实际值的情况。
那么,考虑到所有这些,这似乎是你最可能需要的查询:
var query =
from order in GetOrderStream("FNZCTEST")
from price in GetPriceStream(order.Exchange, order.Security)
.DefaultIfEmpty(new PriceDto())
select new
{
Order = order,
Price = price,
};
IDisposable disposable =
query.Subscribe(x => Console.WriteLine("{0} : {1}", x.Order, x.Price));
现在,也有可能你的Observable.Create
的实现也给你带来了痛苦。所以,如果你可以的话如果你能在你的问题后面加上实现就太好了