我有一个IObservable<String>
。
我正在尝试检测(并处理)同一字符串在短时间内被通知的情况。
我想要一个过滤器/流/可观察的,这样,如果同一个字符串在250毫秒内被通知,它只通知一次。
不太确定从哪里开始。
这里有一个相当紧凑的解决方案。你的帖子对持续时间是否会在一个不同的值到达后立即重置有点令人困惑——所以我为这两种解释提供了两种解决方案。
变体1-不同的"中间值"不会重置计时器
这是指你严格关心抑制的持续时间,而不关心是否有任何"中间"值(根据McGarnagle的解决方案)——也就是说,如果你很快得到"a", "b" ,"a"
,你仍然想抑制第二个"a"
。幸运的是,这对于GroupByUntil
来说非常容易,它在持续时间内分组并发射每组的第一个元素:
public static IObservable<T> DistinctUntilChanged<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
if (scheduler == null) scheduler = Scheduler.Default;
return source.GroupByUntil(k => k,
_ => Observable.Timer(duration, scheduler))
.SelectMany(y => y.FirstAsync());
}
如果你想知道方法名称,我首先想到了变体2b;我保留了上面的名称,所以单元测试仍然通过。它可能需要一个更好的名称,比如SuppressDuplicatesWithinWindow
或类似的。。。
变化2a-"介于"不同值DO重置计时器
这稍微复杂一点-现在不同组中的任何事件都将结束给定组。我使用Publish().RefCount()组合来防止对源的多个订阅,并且必须非常小心使用null:
public static IObservable<T> DistinctUntilChanged<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
if (scheduler == null) scheduler = Scheduler.Default;
var sourcePub = source.Publish().RefCount();
return sourcePub.GroupByUntil(
k => k,
x => Observable.Timer(duration, scheduler)
.TakeUntil(
sourcePub.Where(i => ReferenceEquals(null, i)
? !ReferenceEquals(null, x.Key)
: !i.Equals(x.Key))))
.SelectMany(y => y.FirstAsync());
}
变体2b
这是我尝试过的最初方法,我添加了它,因为它现在并没有那么糟糕,因为我对2a的改进使它更加复杂:
它是接受持续时间的Observable.DistinctUntilChanged
的变体。给定一个事件,在该持续时间内的连续重复事件将被抑制。如果不同的事件到达,或者某个事件在该持续时间之外到达,则会发出该事件并重置抑制计时器。
它通过使用接受IEqualityComparer的DistinctUntilChanged
的重载来工作。如果值匹配并且时间戳在指定的持续时间内,比较器会认为应用了时间戳的事件是相等的。
public static partial class ObservableExtensions
{
public static IObservable<T> DistinctUntilChanged<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
if (scheduler == null) scheduler = Scheduler.Default;
return source.Timestamp(scheduler)
.DistinctUntilChanged(new Comparer<T>(duration))
.Select(ts => ts.Value);
}
private class Comparer<T> : IEqualityComparer<Timestamped<T>>
{
private readonly TimeSpan _duration;
public Comparer(TimeSpan duration)
{
_duration = duration;
}
public bool Equals(Timestamped<T> x, Timestamped<T> y)
{
if (y.Timestamp - x.Timestamp > _duration) return false;
return ReferenceEquals(x.Value, y.Value)
&& !ReferenceEquals(null,x.Value)
&& x.Value.Equals(y.Value);
}
public int GetHashCode(Timestamped<T> obj)
{
if (ReferenceEquals(null,obj.Value)) return obj.Timestamp.GetHashCode();
return obj.Value.GetHashCode() ^ obj.Timestamp.GetHashCode();
}
}
}
以下是我使用的单元测试(包括nuget包rx-testing和nunit):
public class TestDistinct : ReactiveTest
{
[Test]
public void DuplicateWithinDurationIsSupressed()
{
var scheduler = new TestScheduler();
var source =scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"));
}
[Test]
public void NonDuplicationWithinDurationIsNotSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "b"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100,"a"),
OnNext(200,"b"));
}
[Test]
public void DuplicateAfterDurationIsNotSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(400, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, "a"));
}
[Test]
public void NonDuplicateAfterDurationIsNotSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(400, "b"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, "b"));
}
[Test]
public void TestWithSeveralValues()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(450, "b"),
OnNext(900, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(450, "b"),
OnNext(900, "a"));
}
[Test]
public void CanHandleNulls()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(400, (string)null),
OnNext(500, "b"),
OnNext(600, (string)null),
OnNext(700, (string)null));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, (string)null),
OnNext(500, "b"),
OnNext(600, (string)null));
}
[Test]
public void TwoDuplicatesWithinDurationAreSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(150, "a"),
OnNext(200, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"));
}
[Test]
public void TwoNullDuplicatesWithinDurationAreSupressed()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, (string)null),
OnNext(150, (string)null),
OnNext(200, (string)null));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, (string)null));
}
}
最后,为了完整性,变体1将通过TestWithSeveralValues测试的以下变体:
[Test]
public void TestWithSeveralValuesVariation1()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, "a"),
OnNext(200, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(450, "b"),
OnNext(900, "a"));
var duration = TimeSpan.FromTicks(250);
var results = scheduler.CreateObserver<string>();
source.DistinctUntilChanged(duration, scheduler).Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(300, "b"),
OnNext(350, "c"),
OnNext(900, "a"));
}
空测试将在最后更改为:
results.Messages.AssertEqual(
OnNext(100, "a"),
OnNext(400, (string)null),
OnNext(500, "b"),
OnNext(700, (string)null)); /* This line changes */
您正在寻找Observable.Throttle
忽略可观察序列中的值,这些值在指定源和dueTime的到期时间之前后跟另一个值。
编辑
好的,所以上面的内容只适用于对序列中的所有元素进行节流,而不是根据OP通过键。我认为这将是一个简单的下一步,但可能没有那么多?(F#有一个split
函数会很有帮助,但显然没有等价的C#。)
因此,这里有一个实现Split
:的尝试
public static class Extension
{
public static IDisposable SplitSubscribe<T, TKey>(
this IObservable<T> source,
Func<T, TKey> keySelector,
Action<IObservable<TKey>> subscribe)
{
// maintain a list of Observables, one for each key (TKey)
var observables = new ConcurrentDictionary<TKey, Subject<TKey>>();
// function to create a new Subject
Func<TKey, Subject<TKey>> createSubject = key =>
{
Console.WriteLine("Added for " + key);
var retval = new Subject<TKey>();
subscribe(retval);
retval.OnNext(key);
return retval;
};
// function to update an existing Subject
Func<TKey, Subject<TKey>, Subject<TKey>> updateSubject = (key, existing) =>
{
Console.WriteLine("Updated for " + key);
existing.OnNext(key);
return existing;
};
return source.Subscribe(next =>
{
var key = keySelector(next);
observables.AddOrUpdate(key, createSubject, updateSubject);
});
// TODO dispose of all subscribers
}
// special case: key selector is just the item pass-through
public static IDisposable SplitSubscribe<T>(
this IObservable<T> source,
Action<IObservable<T>> subscribe)
{
return source.SplitSubscribe(item => item, subscribe);
}
}
使用此函数,您可以拆分一个可观测的源,然后对每个源进行节流。用法如下:
IObservable<string> dummyObservable = new string[] { "a", "b", "a", "b", "b", "c", "a" }.ToObservable();
dummyObservable.SplitSubscribe(next =>
next.Throttle(TimeSpan.FromMilliseconds(250)).Subscribe(Console.WriteLine));
输出(未维护原始订单)
为添加为b添加已为更新为b更新为b更新为c添加已为更新一cb