热可观察时遇到问题



我正在尝试为每个符号创建两个管道。每个交易品种将根据两个时间帧Buffer数据,并在每个时间帧上执行DoCalc

priceChangedObservable = Observable.FromEvent<QuoteChangeEvent, IQuote>(handler =>
    {
        QuoteChangeEvent qHandler = (e) =>
        {
            handler(e);
        };
        return qHandler;
    },
    qHandler => bapi.MAPI.OnQuoteChange += qHandler,
    qHandler => bapi.MAPI.OnQuoteChange -= qHandler
    );

如果我执行以下操作:

var els = new EventLoopScheduler();
var dispatcher = new EventLoopScheduler();
   var multiCastStream = Observable.Publish(priceChangedObservable);
    int timeFrame = 60;
     multiCastStream
    .GroupBy(instrument => instrument.Symbol)
    .SelectMany(q => q)            
    .Buffer(TimeSpan.FromSeconds(timeFrame))
    .Where(messages => messages.Any())
    .SubscribeOn(els)
    .ObserveOn(dispatcher)
    .Select((sr) => DoCalc(sr, timeFrame))
    .Subscribe((en) => { if (null != en) Console.WriteLine(en); });
    // Start the producer
    multiCastStream.Connect();

一切都按照我的预期工作。如果我注释掉上面的代码并在 multiCastStream.Connect() 语句之前添加第二个时间范围:

    int secondTimeFrame = 300;
    multiCastStream
    .GroupBy(instrument => instrument.Symbol)
    .SelectMany(q => q)
    .Buffer(TimeSpan.FromSeconds(secondTimeFrame))
    .Where(messages => messages.Any())
    .SubscribeOn(els)
    .ObserveOn(dispatcher)
    .Select((sr) => DoCalc(sr, secondTimeFrame))
    .Subscribe((en) => { if (null != en) Console.WriteLine(en); });

这也按预期工作。但是,如果我运行两个代码,则会出现意外行为。

分享Hot Observables时我缺少一些基本的东西吗?

编辑 1

使用 Aron 的答案修改代码后,我得到:

Number of quotes 1
60: 6/20/2019 10:53:26 PM=> M2KU9 Stats.
Number of quotes 1
300: 6/20/2019 10:53:26 PM=> M2KU9 Stats.
Number of quotes 40
60: 6/20/2019 10:54:26 PM=> MNQU9 Stats.

然后,不会打印其他统计信息。

一切都按预期工作,你的逻辑有缺陷。为了演示,我将绘制一个Rx大理石图

Quote   -----------x--------------x-----------x-----------x--
Buffer1 |   |   |   |   |   |   |   |   |   |   |   |   |   |
Buffer2   |       |       |       |       |       |       |   
Where 1 ------------x---------------x-----------x-----------x
Where 2 ----------------------------------x-------x-----------
                    ^Notice It seems like there should be an event on 2
                     But there wasn't?

相反,您可能要同步这些

var synchronizationSource = Observerable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1))
     .Publish()
using(synchronizationSource.Connect())
{
       IObservable<Stat> CreateTimeFrame(int seconds)
       {
           var bufferTimeFrame = synchronizationSource
                         .Where(i => i % seconds == 0);
           return priceChangedObservable
               .GroupBy(instrument => instrument.Symbol)
               .SelectMany(q => q)            
               .Buffer(bufferTimeFrame)
               .Where(messages => messages.Any())
               .ObserveOn(dispatcher)
               .Select((sr) => DoCalc(sr, timeFrame))
       }
       CreateTimeFrame(60)
          .Subscribe((en) => { if (null != en) Console.WriteLine(en); });
       CreateTimeFrame(300)
          .Subscribe((en) => { if (null != en) Console.WriteLine(en); });

       Thread.Sleep(100000);
 }

相关内容

  • 没有找到相关文章

最新更新