在一个长时间运行的进程中,我正在使用rx distinct操作符基于某个键来过滤外部数据流。
这会导致内存泄漏吗?假设会收到很多不同的密钥。rx不同的操作符如何跟踪以前收到的键?
我应该使用groupbyuntil与持续时间选择器代替?
Observable.Distinct
在内部使用HashSet
。内存使用将大致与遇到的不同键的数量成正比。(大概30*n字节)
GroupByUntil
的行为与Distinct
完全不同。GroupByUntil
(well)分组,而Distinct
过滤流的元素。
不确定预期的用途,但是如果你只是想过滤连续的相同的元素,你需要Observable.DistinctUntilChanged
,它的内存占用与键的数量无关。
这可能是一个有争议的策略,但是如果您担心不同键的积累,并且如果存在一个可以安全地重置的时间点,那么您可以使用Observable.Switch引入重置策略。例如,我们有一个"世界状态"每天重置的场景,所以我们可以每天重置不同的可观察到的。
Observable.Create<MyPoco>(
observer =>
{
var distinctPocos = new BehaviorSubject<IObservable<MyPoco>>(pocos.Distinct(x => x.Id));
var timerSubscription =
Observable.Timer(
new DateTimeOffset(DateTime.UtcNow.Date.AddDays(1)),
TimeSpan.FromDays(1),
schedulerService.Default).Subscribe(
t =>
{
Log.Info("Daily reset - resetting distinct subscription.");
distinctPocos.OnNext(pocos.Distinct(x => x.Id));
});
var pocoSubscription = distinctPocos.Switch().Subscribe(observer);
return new CompositeDisposable(timerSubscription, pocoSubscription);
});
然而,我确实倾向于同意James World上面关于使用内存分析器进行测试的评论,以便在引入潜在不必要的复杂性之前检查内存确实是一个问题。如果累积32位整数作为键,那么在大多数平台上遇到内存问题之前,您将拥有数百万个唯一项。例如,262144个32位int键将占用1兆字节。根据您的场景,可能在此之前很久就重置了进程。