如何在 RX 中有条件地缓冲



我正在尝试将日志文件加载为流,以便我可以通过网络让实时日志零售商工作。

它应该从日志文件加载迄今为止的历史行,然后为每个保存的新行触发更新。

为了减少 signalR 流量,我使用 RX 缓冲区将它们批处理成大量 100 行,但是在加载初始文件内容时这是一个问题 - 可能是 100k 行。以 100 个为批次加载太慢。初始文件内容应作为单个消息发送。

我真正想要的是首先在 Observable 上发送一个勾号,其中包含迄今为止文件的全部内容,然后从那时开始,为新行写入触发缓冲更新。但我不确定如何将初始内容作为单个即时报价通过,然后从那时开始缓冲。

到目前为止我的代码

var watcherSubject = new ReplaySubject<LogTailMessage>()
var watcher = new logFileWatcher(logFileLocation)
new TaskFactory().StartNew(() => watcher.StartFileWatch(data => watcherSubject.OnNext(data), CancellationToken.None));
Stream = watcherSubject
    .Buffer(TimeSpan.FromMilliseconds(500), 100)
    .Where(d => d != null)
    .Replay()
    .RefCount();

更新的解决方案

var initialFileLines = watcher.GetInitialData();
new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));
Stream = watcherSubject.Buffer(TimeSpan.FromMilliseconds(500), 100)
    .StartWith(initialFileLines)
    .Replay()
    .RefCount();

使用 StartWith

var originalFileLines = new List<LogTailMessage>(); //Initialize with file contents.
Stream = watcherSubject
    .Buffer(TimeSpan.FromMilliseconds(500), 100)
    .Where(d => d != null)
    .StartWith(originalFileLines)
    .Replay()
    .RefCount();

更新:我不确定为什么StartWith不能可靠地工作。你能用一个模拟的例子修改答案吗?

.Concat应该有效,尽管我认为这基本上正是StartWith应该做的。

Stream = Observable.Return(originalFileLines).Concat(
    watcherSubject
        .Buffer(TimeSpan.FromMilliseconds(500), 100)
        .Where(d => d != null)
        .Replay()
        .RefCount()
   );

相关内容

  • 没有找到相关文章

最新更新