我正在尝试定期合并两个传感器数据流,但在 Rx 中无法正确执行此操作。 我想出的最好的是下面的示例,但我怀疑这是 Rx 的最佳使用。
有没有更好的方法?
我尝试过 Sample(),但传感器以不规则的间隔生成值,慢(>1 秒)和快速(<1 秒)。 Sample() 似乎只处理快速数据。
Observable<SensorA> sensorA = ... /* hot */
Observable<SensorB> sensorB = ... /* hot */
SensorA lastKnownSensorA;
SensorB lastKnownSensorB;
sensorA.Subscribe(s => lastKnownSensorA = s);
sensorB.Subscribe(s => lastKnownSensorB = s);
var combined = Observable.Interval(TimeSpan.FromSeconds(1))
.Where(t => _lastKnownSensorA != null)
.Select(t => new SensorAB(lastKnownSensorA, lastKnownSensorB)
我认为@JonasChapuis的答案可能是你所追求的,但有几个问题可能是有问题的:
- 在所有源每个源至少发出
一个值之前,
CombineLatest
不会发出值,这可能会导致在此之前较快的源丢失数据。这可以通过使用StartWith
在每个传感器流上设定空对象或默认值来缓解。如果在采样周期内未观察到新值,则
Sample
不会发出值。我无法从这个问题中判断这是否可取,但如果不是,则有一个有趣的技巧可以使用"节奏"流来解决这个问题,如下所述以创建固定频率,而不是通过Sample
获得的最大频率。
为了解决CombineLatest
问题,请为传感器流确定适当的 null 值 - 我通常通过类型上的静态 Null
属性提供这些值 - 这使得意图非常明确。对于值类型,使用 Nullable<T>
也是一个不错的选择:
Observable<SensorA> sensorA = ... .StartWith(SensorA.Null);
Observable<SensorB> sensorB = ... .StartWith(SensorB.Null);
注意:不要犯仅将StartWith
应用于CombinedLatest
输出的常见错误......这无济于事!
现在,如果您需要定期结果(自然可能包括最近读数的重复),请创建一个以所需间隔发出的"节奏"流:
var pace = Observable.Interval(TimeSpan.FromSeconds(1));
然后按如下方式组合,从结果中省略配速值:
var sensorReadings = Observable.CombineLatest(
pace, sensorA, sensorB,
(_, a, b) => new SensorAB(a,b));
同样值得了解的是MostRecent
运算符,如果您想以特定流的速度驱动输出,它可以非常有效地与Zip
结合使用。请参阅我演示该方法的这些答案: 如何将缓慢移动的可观察量与快速移动的可观察量的最新值以及处理多个流的更有趣的调整相结合:如何组合三个可观察量,以便
每次生成一个值时,使用 CombineLatest()
运算符合并传感器的最新值,然后Sample()
以确保每秒一次测量的最大频率如何?
sensorA.CombineLatest(sensorB, (a, b) => new {A=a, B=b}).Sample(TimeSpan.FromSeconds(1))