仅发射唯一的值,或者如果自上次发射以来已经过了一段时间



假设我有一个IObservable<int> values(在指定时间(接收这些值:

Time (secs)  Value
1            4
2            3
3            5
4            5
10           5
11           6

更新:显示时间列只是为了举例说明接收值的时间。Observable只包含简单的int值

如果:,我想输出一个值

  1. 它与以前的值不同,或者
  2. 自上一个值以来已超过5秒

预期输出:4、3、5、5、6

所以类似于:

void Foo(IObservable<int> values)
{
...
values
.Something()
.Do(x => Console.WriteLine(x))
...
}

我感觉到,我需要GroupByUntilChanged()Throttle(TimeSpan.FromSeconds(5))的一些组合,但我的反应符相当生疏。。。

这两个条件本身就很容易,合并起来有点棘手:

与以前的值不同:

source.DistintUntilChanged()

安静5秒时输出:

source
.TimeInterval()
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)

对于合并,我会应用一个索引器,合并两个独立的解决方案,然后在索引器上过滤以删除重复项,如下所示:

var result = source
.Select((val, index) => (val, index))           // Add indexer
.Publish(_indexedSource => Observable.Merge(
_indexedSource
.DistinctUntilChanged(t => t.val),
_indexedSource
.TimeInterval()
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
))
.DistinctUntilChanged(t => t.index)            // Filter out items with same index
.Select(t => t.val);                           // Remove indexing

如果你想测试它,你需要将调度器传递到TimeInterval函数中,它看起来像这样:

var ts = new TestScheduler();
// The sequence described in question.
var source = ts.CreateHotObservable(
ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(4 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
// solution
var testResult = source
.Select((val, index) => (val, index))
.Publish(_indexedSource => Observable.Merge(
_indexedSource
.DistinctUntilChanged(t => t.val),
_indexedSource
.TimeInterval(ts)
.Where(timedT => timedT.Interval > TimeSpan.FromSeconds(5))
.Select(timedT => timedT.Value)
))
.DistinctUntilChanged(t => t.index)
.Select(t => t.val);
// Test assertions
var observer = ts.CreateObserver<int>();
testResult.Subscribe(observer);
var expected = ts.CreateHotObservable(
ReactiveTest.OnNext(1 * TimeSpan.TicksPerSecond, 4),
ReactiveTest.OnNext(2 * TimeSpan.TicksPerSecond, 3),
ReactiveTest.OnNext(3 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(10 * TimeSpan.TicksPerSecond, 5),
ReactiveTest.OnNext(11 * TimeSpan.TicksPerSecond, 6),
ReactiveTest.OnCompleted<int>(12 * TimeSpan.TicksPerSecond)
);
ts.Start();
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);

仔细想想,我的一位同事提醒我的力量。组合使用Scan((和DistinctUntilChanged((。因此,为了子孙后代,我也将在这里添加这个解决方案:

var result = source
.TimeInterval(scheduler)
.Scan((prev, curr) =>
{
if (prev.Value != curr.Value || curr.Interval > TimeSpan.FromSeconds(5))
return curr;
else
return prev;
})
.Select(x => x.Value)
.DistinctUntilChanged();

反应式扩展真正遵循Perl的老格言TIMTOWTDI。;-(

最新更新