假设我有一个IObservable<int> values
(在指定时间(接收这些值:
Time (secs) Value
1 4
2 3
3 5
4 5
10 5
11 6
更新:显示时间列只是为了举例说明接收值的时间。Observable只包含简单的int值
如果:,我想输出一个值
- 它与以前的值不同,或者
- 自上一个值以来已超过5秒
预期输出:4、3、5、5、6
所以类似于:
void Foo(IObservable<int> values)
{
...
values
.Something()
.Do(x => Console.WriteLine(x))
...
}
我感觉到,我需要GroupByUntilChanged()
和Throttle(TimeSpan.FromSeconds(5))
的一些组合,但我的反应符相当生疏。。。
这两个条件本身就很容易,合并起来有点棘手:
与以前的值不同:
source.DistintUntilChanged()
安静5秒时输出:
source
.TimeInterval()
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
对于合并,我会应用一个索引器,合并两个独立的解决方案,然后在索引器上过滤以删除重复项,如下所示:
var result = source
.Select((val, index) => (val, index)) // Add indexer
.Publish(_indexedSource => Observable.Merge(
_indexedSource
.DistinctUntilChanged(t => t.val),
_indexedSource
.TimeInterval()
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
))
.DistinctUntilChanged(t => t.index) // Filter out items with same index
.Select(t => t.val); // Remove indexing
如果你想测试它,你需要将调度器传递到TimeInterval
函数中,它看起来像这样:
var ts = new TestScheduler();
// The sequence described in question.
var source = ts.CreateHotObservable(
ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(4 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
// solution
var testResult = source
.Select((val, index) => (val, index))
.Publish(_indexedSource => Observable.Merge(
_indexedSource
.DistinctUntilChanged(t => t.val),
_indexedSource
.TimeInterval(ts)
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
))
.DistinctUntilChanged(t => t.index)
.Select(t => t.val);
// Test assertions
var observer = ts.CreateObserver<int>();
testResult.Subscribe(observer);
var expected = ts.CreateHotObservable(
ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
ts.Start();
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);
仔细想想,我的一位同事提醒我的力量。组合使用Scan((和DistinctUntilChanged((。因此,为了子孙后代,我也将在这里添加这个解决方案:
var result = source
.TimeInterval(scheduler)
.Scan((prev, curr) =>
{
if (prev.Value != curr.Value || curr.Interval > TimeSpan.FromSeconds(5))
return curr;
else
return prev;
})
.Select(x => x.Value)
.DistinctUntilChanged();
反应式扩展真正遵循Perl的老格言TIMTOWTDI。;-(