反应性有条件的节流操作员



我正在寻找像油门这样的运算符,除非仅在布尔值为真时进行节流行为。因此,给定两个可观察的IObservable<T> valuesIObservable<bool> throttleCondition,我想创建一个可观察到的可执行以下操作:

  • values中的所有值都通过,直到
  • throttleCondition产生true时。那么直到
  • throttleCondition再次产生false时。然后,序列的最后值被传播,任何新值再次通过。

或在大理石图中表达(请注意4(:

           values |   0 1   2 3 4   5 6
throttleCondition | F     T       F
           result |   0 1         4 5 6 

我最接近的是:

public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
    bool cond = false;
    throttleCondition.Subscribe(v => cond = v);
    return self.Window(throttleCondition).Select(obs => cond ? obs.TakeLast(1) : obs).SelectMany(obs => obs);
}

但这不是线程安全,因为选择和订阅之间可能会有一场比赛。有没有人有什么建议?也许已经有一个我没有看到的操作员?

编辑:这是我需要的功能的单元测试:

[TestMethod]
public void TestThrottleWhen()
{
    //Setup
    var scheduler = new TestScheduler();
    Subject<int> numberValues = new Subject<int>();
    Subject<bool> flagValues = new Subject<bool>();
    //Define actions
    scheduler.Schedule(TimeSpan.FromTicks(1), () => flagValues.OnNext(false));
    scheduler.Schedule(TimeSpan.FromTicks(10), () => numberValues.OnNext(0));
    scheduler.Schedule(TimeSpan.FromTicks(20), () => numberValues.OnNext(1));
    scheduler.Schedule(TimeSpan.FromTicks(30), () => flagValues.OnNext(true));
    scheduler.Schedule(TimeSpan.FromTicks(40), () => numberValues.OnNext(2));
    scheduler.Schedule(TimeSpan.FromTicks(50), () => numberValues.OnNext(3));
    scheduler.Schedule(TimeSpan.FromTicks(60), () => numberValues.OnNext(4));
    scheduler.Schedule(TimeSpan.FromTicks(70), () => flagValues.OnNext(false));
    scheduler.Schedule(TimeSpan.FromTicks(71), () => flagValues.OnNext(true));
    scheduler.Schedule(TimeSpan.FromTicks(72), () => flagValues.OnNext(false));
    scheduler.Schedule(TimeSpan.FromTicks(80), () => numberValues.OnNext(5));
    scheduler.Schedule(TimeSpan.FromTicks(90), () => numberValues.OnNext(6));
    var actual = scheduler.Start(() => numberValues.ThrottleWhen(flagValues), 0, 0, 100);
    //Assert
    var expected = new[]
    {
        ReactiveTest.OnNext(10, 0),
        ReactiveTest.OnNext(20, 1),
        ReactiveTest.OnNext(70, 4),
        ReactiveTest.OnNext(80, 5),
        ReactiveTest.OnNext(90, 6)
    };
    ReactiveAssert.AreElementsEqual(expected, actual.Messages);
}

编辑2:我最终使用了Alex'答案的修改版本:

public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
    var isPaused = throttleCondition.Prepend(false).DistinctUntilChanged();
    return Observable.Defer(() =>
    {
        object lockObj = new object();
        bool gateIsOpen = false;
        return Observable.CombineLatest(
                self.Synchronize(lockObj).Do(_ => gateIsOpen = true),
                isPaused.Synchronize(lockObj).Do(paused => gateIsOpen = !paused && gateIsOpen),
                (number, paused) => (number, paused)
            )
            .Where(tuple => !tuple.paused && gateIsOpen)
            .Select(tuple => tuple.number);
    });
}

我认为这更干净,但是在情人眼中干净:

在核心上,解决方案看起来像这样:

public static IObservable<T> ThrottleWhenCore<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
    return throttleCondition
        .StartWith(false)
        .Join(self,
            b => b ? Observable.Empty<bool>() : throttleCondition,
            t => self,
            (b, t) => (b, t)
        )
        .Where(tuple => !tuple.b)
        .Select(tuple => tuple.t);
}

Join尚不很好地理解:对于从左侧观察到的每个项目(此处的throttle Condition(都打开一个左窗口。从正确的观察值(self(引入的每个项目都打开一个正确的窗口。每当左窗与右窗口及时相交时,都会产生一个新值。

自然,您可以选择何时关闭Windows:在我们的情况下,self中的任何项目都关闭了最新的右窗口,因此在第一个项目之后,总是有一个和只有一个右窗口打开。对于throttleCondition左侧,当值为false时,窗口保持打开状态(允许在f升起时所有值(通过。当throttleCondition为True时,我们将打开并立即关闭窗口,这仅允许最新值通过。

这将通过所有测试案例,除了您打开并关闭油门的情况。在这种情况下,它将在72 tick标记处倍增4。为了解决这个问题,您添加索引器和DistinctUntilChanged

public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
    return throttleCondition
        .StartWith(false)
        .Join(self.Select((item, index) => (item, index)),
            b => b ? Observable.Empty<bool>() : throttleCondition,
            t => self,
            (b, t) => (b, t)
        )
        .Select(tuple => tuple.t)
        .DistinctUntilChanged()
        .Select(tuple => tuple.item);
}

如果您需要更多的解释,请告诉我。我更喜欢锁定/状态变量。

我希望这将是最终尝试... ^^

回到 combineLatest 方法并结合了一些解决方案,我想出了一个额外的标志,可以帮助我们避免拐角处,而旗帜在不用任何任何方面都在来回更改从可观察到的值发出的值。

bool numberChangedFlag = false;
_numberValues
    .Do(_ => numberChangedFlag = true)
    .CombineLatest(_flagValues.Do(x => numberChangedFlag = !x && numberChangedFlag), (number, flag) => (number, flag))
    .Where(tuple => !tuple.flag && numberChangedFlag)
    .Select(tuple => tuple.number)
    .Subscribe(DoYourMagic);

我已经尝试了您的测试,看起来好像有效。

尽管我不喜欢我们需要一个本地的助手变量来解决这个问题,但要深入研究,并创建一个可观察的助手似乎会使代码变得复杂。

让我知道这次是否有效。:(

这是另一种可能性,在 @Enigmativity的评论中构建。考虑这一点的最佳方法是将其分为两个问题:当条件变为真时,在条件是错误的情况下自由排放,然后将两者合并在一起。

public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
    return throttleCondition.Publish(_throttleCondition => self.Publish(_self => Observable.Merge(
        _throttleCondition   //Get latest when true handler
            .Where(b => !b)
            .WithLatestFrom(
                _self.Select((item, index) => (item, index)),
                (_, t) => t
            )
            .DistinctUntilChanged()
            .Select(t => t.item),
        _throttleCondition   //Freely emit when false, default start with false.
            .StartWith(false)
            .Select(b => b ? Observable.Empty<T>(): _self)
            .Switch()
    )));
}

这使用与我的其他答案相同的索引技巧来防止重复。

最新更新