我正在寻找一个可观察的扩展方法来做反向节流。我的意思是让第一个项目通过,然后在规定时间内忽略后面的项目。
input - due time 2
|*.*.*..*..|
output
|*......*..|
请注意,这是一个不同于下面的问题(它们都是相同的)。下面的问题需要一个固定的抑制持续时间,而我需要一个抑制持续时间,每次新物品过早到达时都会增加。从视觉上看,下面列出的解决方案的输出如下:
input - due time 2
|*.*.*..*..|
output
|*...*..*..|
- 如何采取第一次发生,然后抑制事件2秒(RxJS)
- 如何使用RX节流事件流?
- Rx:我如何立即响应,并限制后续请求
我提出了以下解决方案,但是我对调度器和并发性了解不够,无法确保锁定足够好。当一个Scheduler
参数被添加到方法中时,我也不知道如何实现这个方法。
public static IObservable<T> InverseThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
{
IDisposable coolDownSupscription = null;
object subscriptionLock = new object();
return source
.Where(i =>
{
lock (subscriptionLock)
{
bool result;
if (coolDownSupscription == null)
{
result = true;
}
else
{
coolDownSupscription.Dispose();
result = false;
}
coolDownSupscription = Observable
.Interval(dueTime)
.Take(1)
.Subscribe(_ =>
{
lock (subscriptionLock)
{
coolDownSupscription = null;
}
});
return result;
}
});
}
你可以这样写…
source
.GroupByUntil(
x => Unit.Default,
x => x.Throttle(TimeSpan.FromSeconds(100))
)
.SelectMany(
x => x.ToList().Take(1) // yields first item on completion of the observable.
);
我推荐这个。
public static class IObservable_FirstThenThrottle
{
public static IObservable<TSource> FirstThenThrottle<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
{
var first = source.Take(1);
var second = source.Skip(1).Throttle(dueTime);
return first.Merge(second);
}
}
当第一个条目进入时触发。然后用dueTime
节流剩余的序列。
这是一个大理石图,显示了dueTime = 2
的情况。
source 0-1-2--3--|
result 0------2--3--|