我正在寻找像油门这样的运算符,除非仅在布尔值为真时进行节流行为。因此,给定两个可观察的IObservable<T> values
和IObservable<bool> throttleCondition
,我想创建一个可观察到的可执行以下操作:
-
values
中的所有值都通过,直到 - 当
throttleCondition
产生true
时。那么直到 - 当
throttleCondition
再次产生false
时。然后,序列的最后值被传播,任何新值再次通过。
或在大理石图中表达(请注意4(:
values | 0 1 2 3 4 5 6
throttleCondition | F T F
result | 0 1 4 5 6
我最接近的是:
public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
bool cond = false;
throttleCondition.Subscribe(v => cond = v);
return self.Window(throttleCondition).Select(obs => cond ? obs.TakeLast(1) : obs).SelectMany(obs => obs);
}
但这不是线程安全,因为选择和订阅之间可能会有一场比赛。有没有人有什么建议?也许已经有一个我没有看到的操作员?
编辑:这是我需要的功能的单元测试:
[TestMethod]
public void TestThrottleWhen()
{
//Setup
var scheduler = new TestScheduler();
Subject<int> numberValues = new Subject<int>();
Subject<bool> flagValues = new Subject<bool>();
//Define actions
scheduler.Schedule(TimeSpan.FromTicks(1), () => flagValues.OnNext(false));
scheduler.Schedule(TimeSpan.FromTicks(10), () => numberValues.OnNext(0));
scheduler.Schedule(TimeSpan.FromTicks(20), () => numberValues.OnNext(1));
scheduler.Schedule(TimeSpan.FromTicks(30), () => flagValues.OnNext(true));
scheduler.Schedule(TimeSpan.FromTicks(40), () => numberValues.OnNext(2));
scheduler.Schedule(TimeSpan.FromTicks(50), () => numberValues.OnNext(3));
scheduler.Schedule(TimeSpan.FromTicks(60), () => numberValues.OnNext(4));
scheduler.Schedule(TimeSpan.FromTicks(70), () => flagValues.OnNext(false));
scheduler.Schedule(TimeSpan.FromTicks(71), () => flagValues.OnNext(true));
scheduler.Schedule(TimeSpan.FromTicks(72), () => flagValues.OnNext(false));
scheduler.Schedule(TimeSpan.FromTicks(80), () => numberValues.OnNext(5));
scheduler.Schedule(TimeSpan.FromTicks(90), () => numberValues.OnNext(6));
var actual = scheduler.Start(() => numberValues.ThrottleWhen(flagValues), 0, 0, 100);
//Assert
var expected = new[]
{
ReactiveTest.OnNext(10, 0),
ReactiveTest.OnNext(20, 1),
ReactiveTest.OnNext(70, 4),
ReactiveTest.OnNext(80, 5),
ReactiveTest.OnNext(90, 6)
};
ReactiveAssert.AreElementsEqual(expected, actual.Messages);
}
编辑2:我最终使用了Alex'答案的修改版本:
public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
var isPaused = throttleCondition.Prepend(false).DistinctUntilChanged();
return Observable.Defer(() =>
{
object lockObj = new object();
bool gateIsOpen = false;
return Observable.CombineLatest(
self.Synchronize(lockObj).Do(_ => gateIsOpen = true),
isPaused.Synchronize(lockObj).Do(paused => gateIsOpen = !paused && gateIsOpen),
(number, paused) => (number, paused)
)
.Where(tuple => !tuple.paused && gateIsOpen)
.Select(tuple => tuple.number);
});
}
我认为这更干净,但是在情人眼中干净:
在核心上,解决方案看起来像这样:
public static IObservable<T> ThrottleWhenCore<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
return throttleCondition
.StartWith(false)
.Join(self,
b => b ? Observable.Empty<bool>() : throttleCondition,
t => self,
(b, t) => (b, t)
)
.Where(tuple => !tuple.b)
.Select(tuple => tuple.t);
}
Join
尚不很好地理解:对于从左侧观察到的每个项目(此处的throttle
Condition
(都打开一个左窗口。从正确的观察值(self
(引入的每个项目都打开一个正确的窗口。每当左窗与右窗口及时相交时,都会产生一个新值。
自然,您可以选择何时关闭Windows:在我们的情况下,self
中的任何项目都关闭了最新的右窗口,因此在第一个项目之后,总是有一个和只有一个右窗口打开。对于throttleCondition
左侧,当值为false时,窗口保持打开状态(允许在f升起时所有值(通过。当throttleCondition
为True时,我们将打开并立即关闭窗口,这仅允许最新值通过。
这将通过所有测试案例,除了您打开并关闭油门的情况。在这种情况下,它将在72 tick标记处倍增4。为了解决这个问题,您添加索引器和DistinctUntilChanged
:
public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
return throttleCondition
.StartWith(false)
.Join(self.Select((item, index) => (item, index)),
b => b ? Observable.Empty<bool>() : throttleCondition,
t => self,
(b, t) => (b, t)
)
.Select(tuple => tuple.t)
.DistinctUntilChanged()
.Select(tuple => tuple.item);
}
如果您需要更多的解释,请告诉我。我更喜欢锁定/状态变量。
我希望这将是最终尝试... ^^
回到 combineLatest 方法并结合了一些解决方案,我想出了一个额外的标志,可以帮助我们避免拐角处,而旗帜在不用任何任何方面都在来回更改从可观察到的值发出的值。
bool numberChangedFlag = false;
_numberValues
.Do(_ => numberChangedFlag = true)
.CombineLatest(_flagValues.Do(x => numberChangedFlag = !x && numberChangedFlag), (number, flag) => (number, flag))
.Where(tuple => !tuple.flag && numberChangedFlag)
.Select(tuple => tuple.number)
.Subscribe(DoYourMagic);
我已经尝试了您的测试,看起来好像有效。
尽管我不喜欢我们需要一个本地的助手变量来解决这个问题,但要深入研究,并创建一个可观察的助手似乎会使代码变得复杂。
让我知道这次是否有效。:(
这是另一种可能性,在 @Enigmativity的评论中构建。考虑这一点的最佳方法是将其分为两个问题:当条件变为真时,在条件是错误的情况下自由排放,然后将两者合并在一起。
。public static IObservable<T> ThrottleWhen<T>(this IObservable<T> self, IObservable<bool> throttleCondition)
{
return throttleCondition.Publish(_throttleCondition => self.Publish(_self => Observable.Merge(
_throttleCondition //Get latest when true handler
.Where(b => !b)
.WithLatestFrom(
_self.Select((item, index) => (item, index)),
(_, t) => t
)
.DistinctUntilChanged()
.Select(t => t.item),
_throttleCondition //Freely emit when false, default start with false.
.StartWith(false)
.Select(b => b ? Observable.Empty<T>(): _self)
.Switch()
)));
}
这使用与我的其他答案相同的索引技巧来防止重复。