我编写了测试来观察Distinc
操作
public class Test: ReactiveTest {
[Fact]
public void Observe_distint_nonDistinc() {
var scheduler = new TestScheduler();
var source = scheduler.CreateHotObservable(
OnNext(100, "a"),
OnNext(110, "b"),
OnNext(200, "a"),
OnNext(220, "c"),
OnNext(221, "a")
);
var results = scheduler.CreateObserver<string>();
source.Distinct().Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(OnNext(100,"a"),OnNext(110,"b"),OnNext(220,"c"));
}
}
测试通过正常,但是我不确定如何同时观察重复项。我尝试了Publish
和CombineLatest
的一些组合,但我觉得它们不值得一提。我的duplicate
流应该只有 2 个项目OnNext(200,"a"), OnNext(221,"a")
这是一个完整的解决方案:
[Fact]
public void ObserveDistinctNonDistinct()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateHotObservable(
OnNext(100, "a"),
OnNext(110, "b"),
OnNext(200, "a"),
OnNext(220, "c"),
OnNext(221, "a")
).Publish();
var distinctResults = scheduler.CreateObserver<string>();
source
.Distinct()
.Subscribe(distinctResults);
var nonDistinctResults = scheduler.CreateObserver<string>();
(from letter in source
group letter by letter
into groupedLetters
from count in groupedLetters
.Window(Observable.Never<string>())
.SelectMany(ol =>
ol.Scan(0, (c, _) => ++c))
where count > 1
select groupedLetters.Key)
.Distinct()
.Subscribe(nonDistinctResults);
source.Connect();
scheduler.AdvanceBy(1000);
distinctResults.Messages.AssertEqual(OnNext(100, "a"), OnNext(110, "b"), OnNext(220, "c"));
nonDistinctResults.Messages.AssertEqual(OnNext(200, "a"));
}
它在第二次出现任何重复项时匹配。
使用方法语法:
source
.GroupBy(s => s)
.SelectMany(g =>
g.Window(Observable.Never<string>())
.SelectMany(ol =>
ol.Scan(0, (c, _) => ++c))
.Where(l => l > 1)
.Select(_ => g.Key))
.Distinct()
.Subscribe(nonDistinctResults);
直到我找到一种更快的实现方式,我想发布我的自定义CollectDublicates
扩展方法。测试现在按预期通过。
public class Test : ReactiveTest {
[Fact]
public void Observe_distint_nonDistinc() {
var scheduler = new TestScheduler();
var source = scheduler.CreateHotObservable(
OnNext(100, "a"),
OnNext(110, "b"),
OnNext(200, "a"),
OnNext(220, "c"),
OnNext(221, "a")
).Publish();
var distinnctResults = scheduler.CreateObserver<string>();
source.Distinct().Subscribe(distinnctResults);
var duplicatesResults = scheduler.CreateObserver<string>();
source.CollectDuplicates().Subscribe(duplicatesResults);
source.Connect();
scheduler.AdvanceBy(1000);
distinnctResults.Messages.AssertEqual(OnNext(100, "a"), OnNext(110, "b"), OnNext(220, "c"));
duplicatesResults.Messages.AssertEqual(OnNext(200,"a"),OnNext(221,"a"));
}
}
public static class RxEx{
class DubplicateCollector<T> : IEqualityComparer<T> {
readonly Subject<T> _matches = new Subject<T>();
public IObservable<T> Matches => _matches;
public bool Equals(T x, T y) {
var @equals = x.Equals(y);
if (equals)
_matches.OnNext(x);
return @equals;
}
public int GetHashCode(T obj) {
return obj.GetHashCode();
}
}
public static IObservable<TSource> CollectDuplicates<TSource>(this IObservable<TSource> source) {
var dubplicateCollector = new DubplicateCollector<TSource>();
var compositeDisposable = new CompositeDisposable { source.Distinct(dubplicateCollector).Subscribe() };
return Observable.Create<TSource>(observer => {
var disposable = dubplicateCollector.Matches.Subscribe(observer.OnNext, observer.OnError, observer.OnCompleted);
compositeDisposable.Add(disposable);
return () => compositeDisposable.Dispose();
});
}
}