反应性测试以观察不同值和重复值



我编写了测试来观察Distinc操作

public class Test: ReactiveTest {
    [Fact]
    public void Observe_distint_nonDistinc() {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateHotObservable(
            OnNext(100, "a"),
            OnNext(110, "b"),
            OnNext(200, "a"),
            OnNext(220, "c"),
            OnNext(221, "a")
        );

        var results = scheduler.CreateObserver<string>();
        source.Distinct().Subscribe(results);
        scheduler.AdvanceBy(1000);
        results.Messages.AssertEqual(OnNext(100,"a"),OnNext(110,"b"),OnNext(220,"c"));
    }
}

测试通过正常,但是我不确定如何同时观察重复项。我尝试了PublishCombineLatest的一些组合,但我觉得它们不值得一提。我的duplicate流应该只有 2 个项目OnNext(200,"a"), OnNext(221,"a")

这是一个完整的解决方案:

[Fact]
public void ObserveDistinctNonDistinct()
{
    var scheduler = new TestScheduler();
    var source = scheduler.CreateHotObservable(
        OnNext(100, "a"),
        OnNext(110, "b"),
        OnNext(200, "a"),
        OnNext(220, "c"),
        OnNext(221, "a")
    ).Publish();
    var distinctResults = scheduler.CreateObserver<string>();
    source
        .Distinct()
        .Subscribe(distinctResults);
    var nonDistinctResults = scheduler.CreateObserver<string>();
    (from letter in source
        group letter by letter
        into groupedLetters
        from count in groupedLetters
            .Window(Observable.Never<string>())
            .SelectMany(ol =>
                ol.Scan(0, (c, _) => ++c))
        where count > 1
        select groupedLetters.Key)
    .Distinct()
    .Subscribe(nonDistinctResults);
    source.Connect();
    scheduler.AdvanceBy(1000);
    distinctResults.Messages.AssertEqual(OnNext(100, "a"), OnNext(110, "b"), OnNext(220, "c"));
    nonDistinctResults.Messages.AssertEqual(OnNext(200, "a"));
}

它在第二次出现任何重复项时匹配。

使用方法语法:

source
    .GroupBy(s => s)
    .SelectMany(g =>
        g.Window(Observable.Never<string>())
            .SelectMany(ol =>
                ol.Scan(0, (c, _) => ++c))
            .Where(l => l > 1)
            .Select(_ => g.Key))
    .Distinct()
    .Subscribe(nonDistinctResults);

直到我找到一种更快的实现方式,我想发布我的自定义CollectDublicates扩展方法。测试现在按预期通过。

public class Test : ReactiveTest {
    [Fact]
    public void Observe_distint_nonDistinc() {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateHotObservable(
            OnNext(100, "a"),
            OnNext(110, "b"),
            OnNext(200, "a"),
            OnNext(220, "c"),
            OnNext(221, "a")
        ).Publish();

        var distinnctResults = scheduler.CreateObserver<string>();
        source.Distinct().Subscribe(distinnctResults);
        var duplicatesResults = scheduler.CreateObserver<string>();
        source.CollectDuplicates().Subscribe(duplicatesResults);
        source.Connect();
        scheduler.AdvanceBy(1000);
        distinnctResults.Messages.AssertEqual(OnNext(100, "a"), OnNext(110, "b"), OnNext(220, "c"));
        duplicatesResults.Messages.AssertEqual(OnNext(200,"a"),OnNext(221,"a"));
    }
}
public static class RxEx{
    class DubplicateCollector<T> : IEqualityComparer<T> {
        readonly Subject<T> _matches = new Subject<T>();
        public IObservable<T> Matches => _matches;
        public bool Equals(T x, T y) {
            var @equals = x.Equals(y);
            if (equals)
                _matches.OnNext(x);
            return @equals;
        }
        public int GetHashCode(T obj) {
            return obj.GetHashCode();
        }
    }
    public static IObservable<TSource> CollectDuplicates<TSource>(this IObservable<TSource> source) {
        var dubplicateCollector = new DubplicateCollector<TSource>();
        var compositeDisposable = new CompositeDisposable { source.Distinct(dubplicateCollector).Subscribe() };
        return Observable.Create<TSource>(observer => {
            var disposable = dubplicateCollector.Matches.Subscribe(observer.OnNext, observer.OnError, observer.OnCompleted);
            compositeDisposable.Add(disposable);
            return () => compositeDisposable.Dispose();
        });
    }
}  

相关内容

  • 没有找到相关文章

最新更新