我正在尝试将日志文件加载为流,以便我可以通过网络让实时日志零售商工作。
它应该从日志文件加载迄今为止的历史行,然后为每个保存的新行触发更新。
为了减少 signalR 流量,我使用 RX 缓冲区将它们批处理成大量 100 行,但是在加载初始文件内容时这是一个问题 - 可能是 100k 行。以 100 个为批次加载太慢。初始文件内容应作为单个消息发送。
我真正想要的是首先在 Observable 上发送一个勾号,其中包含迄今为止文件的全部内容,然后从那时开始,为新行写入触发缓冲更新。但我不确定如何将初始内容作为单个即时报价通过,然后从那时开始缓冲。
到目前为止我的代码
var watcherSubject = new ReplaySubject<LogTailMessage>()
var watcher = new logFileWatcher(logFileLocation)
new TaskFactory().StartNew(() => watcher.StartFileWatch(data => watcherSubject.OnNext(data), CancellationToken.None));
Stream = watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 100)
.Where(d => d != null)
.Replay()
.RefCount();
更新的解决方案
var initialFileLines = watcher.GetInitialData();
new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));
Stream = watcherSubject.Buffer(TimeSpan.FromMilliseconds(500), 100)
.StartWith(initialFileLines)
.Replay()
.RefCount();
使用 StartWith
:
var originalFileLines = new List<LogTailMessage>(); //Initialize with file contents.
Stream = watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 100)
.Where(d => d != null)
.StartWith(originalFileLines)
.Replay()
.RefCount();
更新:我不确定为什么StartWith
不能可靠地工作。你能用一个模拟的例子修改答案吗?
.Concat
应该有效,尽管我认为这基本上正是StartWith
应该做的。
Stream = Observable.Return(originalFileLines).Concat(
watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 100)
.Where(d => d != null)
.Replay()
.RefCount()
);