转发有冷却时间的Rx项目,当它们来得太快时切换到采样



我正在寻找Rx方法,该方法将接受一个可观察的结果,并将最新的物品置于"冷却"状态,这样,当物品进入速度慢于冷却时间时,它们会被转发,但当它们进入速度快时,你只会在每个冷却期后获得最新的值。

换句话说,当项目间隔小于t时间时,我想切换到周期为t的采样(当它们展开时切换回)。

这与Observable.Throttle的功能非常相似,只是每当新项目到达时,计时器不会重置。

我想到的应用程序是通过网络发送"最新价值"更新。我不想交流一个值,除非它发生了变化,我也不想对一个快速变化的值进行过多的垃圾邮件处理,以至于淹没了其他数据。

有标准的方法可以满足我的需求吗?

Strilanc,考虑到您对源流安静时不需要的活动的担忧,您可能会对这种调整事件的方法感兴趣——否则我不会添加此内容,因为我认为J.Lennon的实现非常合理(而且简单得多),并且计时器的性能不会受到影响。

该实现还有另一个有趣的区别——它与Sample方法不同,因为它会立即发出冷却期之外发生的事件,而不是在下一个采样间隔发生的事件。它在冷却时间之外没有定时器。

编辑-这里是解决Chris在评论中提到的问题的v3-它确保了在冷却期间发生的更改本身会触发一个新的冷却期。

public static IObservable<T> LimitRate<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
return source.DistinctUntilChanged()
.GroupByUntil(k => 0,
g => Observable.Timer(duration, scheduler))
.SelectMany(x => x.FirstAsync()
.Merge(x.Skip(1)
.TakeLast(1)))
.Select(x => Observable.Return(x)
.Concat(Observable.Empty<T>()
.Delay(duration, scheduler)))
.Concat();
}

这是通过最初使用GroupByUntil在冷却期内将所有事件打包到同一组中来实现的。它监视更改,并在组过期时发出最终更改(如果有的话)。

然后,生成的事件被投影到流中,其OnCompleted被延迟了冷却期。然后将这些流连接在一起。这可以防止事件比冷却更紧密地联系在一起,但在其他情况下,它们会尽快发出。

以下是单元测试(针对v3编辑进行了更新),您可以使用nuget包rx-testingnunit:运行这些测试

public class LimitRateTests : ReactiveTest
{
[Test]
public void SlowerThanRateIsUnchanged()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(200, 1),
OnNext(400, 2),
OnNext(700, 3));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(200, 1),
OnNext(400, 2),
OnNext(700, 3));
}
[Test]
public void FasterThanRateIsSampled()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(140, 5),
OnNext(150, 2),
OnNext(300, 3),
OnNext(350, 4));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4));
}
[Test]
public void DuplicatesAreOmitted()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 1),
OnNext(300, 1),
OnNext(350, 1));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1));
}
[Test]
public void CoolResetsCorrectly()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 2),
OnNext(205, 3));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
}
[Test]
public void MixedPacingWorks()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 1),
OnNext(450, 3),
OnNext(750, 4),
OnNext(825, 5));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(450, 3),
OnNext(750, 4),
OnNext(850, 5));
}
}

您可以使用Observable.DistinctUntilChangedObservable.Sample

可观察。DistinctUntilChanged

只有当值与以前的值不同时,此方法才会显示这些值。(http://www.introtorx.com/content/v1.0.10621.0/05_Filtering.html)

可观察。样本

Sample方法只是为每个指定的TimeSpan取最后一个值。(http://www.introtorx.com/content/v1.0.10621.0/13_TimeShiftedSequences.html#Sample)

为了生成所需的效果,您可以将生成的第一个项目与上面描述的项目相结合。

我意识到这个问题已经解决了一段时间,但我想提供一个替代解决方案,我认为更准确地匹配原始需求。此解决方案引入了2个自定义运算符。

首先是SampleImmediate,它的工作原理与Sample完全相同,只是它会立即发送第一个项目。这是通过许多操作员完成的。Materialize/DematerializeDistinctUntilChanged协同工作以确保不发送重复的通知。MergeTake(1)Sample提供基本的"立即采样"功能。CCD_ 17和CCD_。CCD_ 19和CCD_ 20确保我们在启动计时器之前等待第一个事件发生。Create帮助我们妥善处理一切。

public static IObservable<T> SampleImmediate<T>(this IObservable<T> source, TimeSpan dueTime)
{
return source
.GroupBy(x => 0)
.SelectMany(group =>
{
return Observable.Create<T>(o =>
{
var connectable = group.Materialize().Publish();
var sub = Observable.Merge(
connectable.Sample(dueTime),
connectable.Take(1)
)
.DistinctUntilChanged()
.Dematerialize()
.Subscribe(o);
return new CompositeDisposable(connectable.Connect(), sub);
});
});
}

在我们有了SampleImmediate之后,我们可以通过使用GroupByUntil对所有发生的事件进行分组来创建Cooldown,直到我们的滑动Throttle窗口关闭。一旦我们有了团队,我们就简单地SampleImmediate整个事情。

public static IObservable<T> Cooldown<T>(this IObservable<T> source, TimeSpan dueTime)
{
return source
.GroupByUntil(x => 0, group => group.Throttle(dueTime))
.SelectMany(group => group.SampleImmediate(dueTime));
}

我绝不是说这个解决方案更好更快,我只是觉得看到一种替代方法可能会很好。

自我回答。

虽然我问的是Rx,但我的实际情况是它的端口(ReactiveCocoa)。更多的人知道Rx,我可以翻译。

无论如何,我最终直接实现了它,这样它就可以满足我想要的延迟/性能属性:

-(RACSignal*)cooldown:(NSTimeInterval)cooldownPeriod onScheduler:(RACScheduler *)scheduler {
need(cooldownPeriod >= 0);
need(!isnan(cooldownPeriod));
need(scheduler != nil);
need(scheduler != RACScheduler.immediateScheduler);
force(cooldownPeriod != 0); //todo: bother with no-cooldown case?
force(!isinf(cooldownPeriod)); //todo: bother with infinite case?
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
need(subscriber != nil);
NSObject* lock = [NSObject new];
__block bool isCoolingDown = false;
__block bool hasDelayedValue = false;
__block id delayedValue = nil;
__block RACDisposable *cooldownDisposer = nil;
void (^onCanSendValue)(void) = ^{
@synchronized (lock) {
// check that we were actually cooling down
// (e.g. what if the system thrashed before we could dispose the running-down timer, causing a redundant call?)
if (!isCoolingDown) {
return;
}
// if no values arrived during the cooldown, we do nothing and can stop the timer for now
if (!hasDelayedValue) {
isCoolingDown = false;
[cooldownDisposer dispose];
return;
}
// forward latest value
id valueToSend = delayedValue;
hasDelayedValue = false;
delayedValue = nil;
// todo: can this be avoided?
// holding a lock while triggering arbitrary actions cam introduce subtle deadlock cases...
[subscriber sendNext:valueToSend];
}
};
void (^preemptivelyEndCooldown)(void) = ^{
// forward latest value AND ALSO force cooldown to run out (disposing timer)
onCanSendValue();
onCanSendValue();
};
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
bool didStartCooldown;
@synchronized (lock) {
hasDelayedValue = true;
delayedValue = x;
didStartCooldown = !isCoolingDown;
isCoolingDown = true;
}
if (didStartCooldown) {
// first item gets sent right away
onCanSendValue();
// coming items have to wait for the timer to run down
cooldownDisposer = [[RACSignal interval:cooldownPeriod onScheduler:scheduler]
subscribeNext:^(id _) { onCanSendValue(); }];
}
} error:^(NSError *error) {
preemptivelyEndCooldown();
[subscriber sendError:error];
} completed:^{
preemptivelyEndCooldown();
[subscriber sendCompleted];
}];
return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
@synchronized (lock) {
isCoolingDown = false;
[cooldownDisposer dispose];
}
}];
}] setNameWithFormat:@"[%@ cooldown:%@]", self.name, @(cooldownPeriod)];
}

它应该几乎直接翻译成.Net RX。当物品停止到达时,它将停止做任何工作,并在尊重冷却时间的情况下尽快转发物品。

相关内容

  • 没有找到相关文章

最新更新