我正在尝试为每个符号创建两个管道。每个交易品种将根据两个时间帧Buffer
数据,并在每个时间帧上执行DoCalc
。
priceChangedObservable = Observable.FromEvent<QuoteChangeEvent, IQuote>(handler =>
{
QuoteChangeEvent qHandler = (e) =>
{
handler(e);
};
return qHandler;
},
qHandler => bapi.MAPI.OnQuoteChange += qHandler,
qHandler => bapi.MAPI.OnQuoteChange -= qHandler
);
如果我执行以下操作:
var els = new EventLoopScheduler();
var dispatcher = new EventLoopScheduler();
var multiCastStream = Observable.Publish(priceChangedObservable);
int timeFrame = 60;
multiCastStream
.GroupBy(instrument => instrument.Symbol)
.SelectMany(q => q)
.Buffer(TimeSpan.FromSeconds(timeFrame))
.Where(messages => messages.Any())
.SubscribeOn(els)
.ObserveOn(dispatcher)
.Select((sr) => DoCalc(sr, timeFrame))
.Subscribe((en) => { if (null != en) Console.WriteLine(en); });
// Start the producer
multiCastStream.Connect();
一切都按照我的预期工作。如果我注释掉上面的代码并在 multiCastStream.Connect()
语句之前添加第二个时间范围:
int secondTimeFrame = 300;
multiCastStream
.GroupBy(instrument => instrument.Symbol)
.SelectMany(q => q)
.Buffer(TimeSpan.FromSeconds(secondTimeFrame))
.Where(messages => messages.Any())
.SubscribeOn(els)
.ObserveOn(dispatcher)
.Select((sr) => DoCalc(sr, secondTimeFrame))
.Subscribe((en) => { if (null != en) Console.WriteLine(en); });
这也按预期工作。但是,如果我运行两个代码,则会出现意外行为。
分享Hot Observables
时我缺少一些基本的东西吗?
编辑 1
使用 Aron 的答案修改代码后,我得到:
Number of quotes 1
60: 6/20/2019 10:53:26 PM=> M2KU9 Stats.
Number of quotes 1
300: 6/20/2019 10:53:26 PM=> M2KU9 Stats.
Number of quotes 40
60: 6/20/2019 10:54:26 PM=> MNQU9 Stats.
然后,不会打印其他统计信息。
一切都按预期工作,你的逻辑有缺陷。为了演示,我将绘制一个Rx大理石图
Quote -----------x--------------x-----------x-----------x--
Buffer1 | | | | | | | | | | | | | |
Buffer2 | | | | | | |
Where 1 ------------x---------------x-----------x-----------x
Where 2 ----------------------------------x-------x-----------
^Notice It seems like there should be an event on 2
But there wasn't?
相反,您可能要同步这些
var synchronizationSource = Observerable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1))
.Publish()
using(synchronizationSource.Connect())
{
IObservable<Stat> CreateTimeFrame(int seconds)
{
var bufferTimeFrame = synchronizationSource
.Where(i => i % seconds == 0);
return priceChangedObservable
.GroupBy(instrument => instrument.Symbol)
.SelectMany(q => q)
.Buffer(bufferTimeFrame)
.Where(messages => messages.Any())
.ObserveOn(dispatcher)
.Select((sr) => DoCalc(sr, timeFrame))
}
CreateTimeFrame(60)
.Subscribe((en) => { if (null != en) Console.WriteLine(en); });
CreateTimeFrame(300)
.Subscribe((en) => { if (null != en) Console.WriteLine(en); });
Thread.Sleep(100000);
}