建议在长时间运行的过程中使用rx distinct



在一个长时间运行的进程中,我正在使用rx distinct操作符基于某个键来过滤外部数据流。

这会导致内存泄漏吗?假设会收到很多不同的密钥。rx不同的操作符如何跟踪以前收到的键?

我应该使用groupbyuntil与持续时间选择器代替?

Observable.Distinct在内部使用HashSet。内存使用将大致与遇到的不同键的数量成正比。(大概30*n字节)

GroupByUntil的行为与Distinct完全不同。
GroupByUntil (well)分组,而Distinct过滤流的元素。

不确定预期的用途,但是如果你只是想过滤连续的相同的元素,你需要Observable.DistinctUntilChanged,它的内存占用与键的数量无关。

这可能是一个有争议的策略,但是如果您担心不同键的积累,并且如果存在一个可以安全地重置的时间点,那么您可以使用Observable.Switch引入重置策略。例如,我们有一个"世界状态"每天重置的场景,所以我们可以每天重置不同的可观察到的。

Observable.Create<MyPoco>(
    observer =>
    {
        var distinctPocos = new BehaviorSubject<IObservable<MyPoco>>(pocos.Distinct(x => x.Id));
        var timerSubscription =
            Observable.Timer(
                new DateTimeOffset(DateTime.UtcNow.Date.AddDays(1)),
                TimeSpan.FromDays(1),
                schedulerService.Default).Subscribe(
                    t =>
                    {
                        Log.Info("Daily reset - resetting distinct subscription.");
                        distinctPocos.OnNext(pocos.Distinct(x => x.Id));
                    });
        var pocoSubscription = distinctPocos.Switch().Subscribe(observer);
        return new CompositeDisposable(timerSubscription, pocoSubscription);
    });

然而,我确实倾向于同意James World上面关于使用内存分析器进行测试的评论,以便在引入潜在不必要的复杂性之前检查内存确实是一个问题。如果累积32位整数作为键,那么在大多数平台上遇到内存问题之前,您将拥有数百万个唯一项。例如,262144个32位int键将占用1兆字节。根据您的场景,可能在此之前很久就重置了进程。

相关内容

  • 没有找到相关文章

最新更新