.NET Rx-ReplaySubject缓冲区大小不起作用



我一直在使用.NET Reactive Extensions来观察日志事件。我目前正在使用一个从IObservable派生的类,该类使用ReplaySubject来存储日志,这样我就可以过滤和回放日志(例如:显示所有错误日志,或显示所有详细日志),而不会丢失我缓冲的日志。

问题是,尽管我已经在这个主题上设置了缓冲区大小:

this.subject = new ReplaySubject<LogEvent>(10);

当我使用OnNext添加到无限循环上的可观察集合时,我的程序的内存使用率达到了极限:

internal void WatchForNewEvents()
        {
            Task.Factory.StartNew(() =>
                {
                    while (true)
                    {
                        dynamic parameters = new ExpandoObject();
                        // TODO: Add parameters for getting specific log events
                        if (this.logEventRepository.GetManyHasNewResults(parameters))
                        {
                            foreach (var recentEvent in this.logEventRepository.GetMany(parameters))
                            {
                                this.subject.OnNext(recentEvent);
                            }
                        }
                        // Commented this out for now to really see the memory go up 
                        // Thread.Sleep(1000); 
                    }
                });
        }

ReplaySubject上的缓冲区大小不起作用吗?当达到缓冲区大小时,它似乎并没有清除缓冲区。非常感谢您的帮助!

更新:

我添加这样的订阅者(这是错误的吗?):

public IDisposable Subscribe(IObserver<LogEvent> observer)
        {
            return this.subject.Subscribe(observer);
        }

它被称为:

// Inserts into UI ListView
    this.logEventObservable.Subscribe(evt => this.InsertNewLogEvent(evt));

我不确定这是否是最终的答案,但我怀疑您遇到了问题,因为您使用的调度器存在并发性。您在ReplaySubject上调用的构造函数如下所示:

public ReplaySubject(int bufferSize)
    : this(bufferSize, TimeSpan.MaxValue, Scheduler.CurrentThread)
{ }

Scheduler.CurrentThread让我担心。试着把它改成Scheduler.ThreadPool,看看这是否有帮助。

此外,顺便说一句,你似乎把Rx和TPL以及老式的线程睡眠混合在一起。通常最好避免那样做。您可以将WatchForNewEvents代码更改为如下所示:

dynamic parameters = new ExpandoObject();
var newEvents =
    from n in Observable.Interval(TimeSpan.FromSeconds(1.0))
    where this.logEventRepository.GetManyHasNewResults(parameters)
    from recentEvent in
        this.logEventRepository.GetMany(parameters).ToObservable()
    select recentEvent;
newEvents.Subscribe(this.subject);

这是一种很好的紧凑的Rx-y做事方式。

最新更新