我应该如何解析和延迟热反应式扩展 IConnectableObservable<T> 流?



我无法实现这一点,所以不幸的是我没有Rx代码来显示我在哪里。我将解释我想做什么,希望有人有足够的专业知识可能能够引导我在正确的方向。

假设我有一个从数据库中提取的List TimeObjects集合。TimeObject有这样的形式。

public class TimeObject
{
   public    DateTime time{get;set;}
   public    decimal  price{get;set;}
}

我想做的是生成一个Hot IConnectableObservable流,其中每个元素由一个可变的时间延迟分隔。

可变时间延迟等于当前和下一个TimeObject的时间之差。

例如,如果流是:

s1, s2, s3,…sN,其中每个's'表示TimeObject

的一个元素

则,s2被推送到流之前的时间延迟为(s2)。Time(时间);s3被推送到流之前的时间延迟是(s3)。Time - s2.time)等。

这是可能的吗?如果是,我的实现应该是什么样子?

我包含了一个for循环,它使用经典方法完成相同的任务。

        FireEvent(TimeObject[0]); //Post the first element as an event
        for(int _counter = 0; _counter < TimeObject.Count; _counter++)
        {
            if ( _counter + 1 < TimeObject.Count )
            {
                TimeObject timeObjectA = TimeObject[_counter];
                TimeObject timeObjectB = TimeObject[_counter + 1];
                int millisecondsDelay = (timeObjectB.time - timeObjectA.time).Milliseconds;
                Thread.Sleep(millisecondsDelay);
                FireEvent(timeObjectB);   //this posts the latest tick as an event with variable time delay.
            }

我想把它转换成一个Observable流。

谢谢。

试试这个:

var observable =
    Observable
        .Generate(
            1,
            x => x < timeObjects.Count,
            x => x + 1, x => timeObjects[x],
            x => timeObjects[x].time.Subtract(timeObjects[x - 1].time)))
        .StartWith(timeObjects[0]);

这是一个常规的冷观测。你可以用.Publish()来使它变热。你为什么要热的?

它还要求你的列表至少有两个元素

相关内容

  • 没有找到相关文章

最新更新