我想逐行将文件的内容加载到流中,然后坐在那里每秒观察文件中的新条目 - 因此,实时文件读取器将输出通过管道传输到 RX。
我通过一次读取一行来实现这一点,如果 readline(( 上有数据,则通过传入的操作回调,该操作在调用者中将数据放在 RX 订阅者的重播主题上。
问题是这一次只在 RX 流上发回一行。我想将它们批处理起来,这样它就不会回调,直到您说出要发回的 10 个项目,或者已经过去了某个时间 - 例如 5-10 秒。
我的回调是一个数据的集合,现在我已经硬编码为只返回集合中的单个项目,因为我无法弄清楚如何进行基于时间的批处理。
谁能建议如何实现这一目标?
到目前为止我的代码
public void StartFileWatcher(Action<LogTailMessage[]> callbackAction, CancellationToken cancellationToken)
{
var wh = new AutoResetEvent(false);
var fsw = new FileSystemWatcher(_path)
{
Filter = _file,
EnableRaisingEvents = true
};
fsw.Changed += (s, e) => wh.Set();
var lineNumber = 1;
var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
using (var sr = new StreamReader(fs))
{
while (!cancellationToken.IsCancellationRequested && !_isCancelled)
{
var s = sr.ReadLine();
if (s != null)
{
//todo - batch these up so we only call back once we have 10 items, or if a certain amount of time has passed, send what we have
callbackAction(new [] {new LogTailMessage(lineNumber, s)});
lineNumber++;
}
else
wh.WaitOne(1000);
}
}
}
更新:缓冲溶液
var watcherSubject = new ReplaySubject<LogTailMessage>();
var watcher = new LogFileWatcher(path, filename);
new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));
Stream = watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 20)
.Where(d => d != null)
.Replay()
.RefCount();
和文件观察程序
public void StartFileWatcher(Action<LogTailMessage> callbackAction, CancellationToken cancellationToken)
{
var wh = new AutoResetEvent(false);
var fsw = new FileSystemWatcher(_path)
{
Filter = _file,
EnableRaisingEvents = true
};
fsw.Changed += (s, e) => wh.Set();
var fileName = Path.Combine(_path, _file);
var startLine = GetFileStartLine(fileName);
var lineNumber = 1;
var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
using (var sr = new StreamReader(fs))
{
while (!cancellationToken.IsCancellationRequested && !_isCancelled)
{
var s = sr.ReadLine();
if (s != null)
{
if (lineNumber >= startLine)
callbackAction(new LogTailMessage(lineNumber, s));
lineNumber++;
}
else
{
wh.WaitOne(1000);
}
}
}
}
你在原始代码中做了很多你真的不需要的工作,并且你正在创建未被清理的一次性物品和事件处理程序。
你真的可以在几个可观察量中完成整个事情。
首先,您需要观察文件中的更改。 方法如下:
IObservable<Unit> fileSystemWatcherChanges =
Observable
.Using(() =>
new FileSystemWatcher(_path)
{
Filter = _file,
EnableRaisingEvents = true
},
fsw =>
Observable
.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Changed += h, h => fsw.Changed -= h)
.Select(x => Unit.Default));
现在,您需要打开一个流并在每次文件更改时从流中读取:
IObservable<LogTailMessage> messages =
Observable
.Using(
() => new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite),
fs =>
Observable
.Using(
() => new StreamReader(fs),
sr =>
fileSystemWatcherChanges
.StartWith(Unit.Default)
.Select(x =>
Observable
.Defer(() => Observable.FromAsync(() => sr.ReadLineAsync()))
.Repeat()
.TakeUntil(w => w == null))
.Merge()
.Where(w => w != null)))
.Select((x, n) => new LogTailMessage(n, x));
IObservable<IList<LogTailMessage>> buffered =
messages
.Buffer(TimeSpan.FromSeconds(5), 10);
我在计算机上对此进行了测试,我相信它提供了您需要的结果。
这是一个完整的 Rx 管道,所以如果你像IDisposable subscription = buffered.Subscribe();
一样订阅,后来你打电话给subscription.Dispose();
那么它都会自行清理。
它避免了主题。
您可以在该主题上使用Buffer
:
var subject = ReplaySubject<LogTailMessage>();
StartFileWatcher(a => a.ToList().ForEach(ltm => subject.OnNext(ltm)), CancellationToken.None);
bufferedSubject = subject.Buffer (TimeSpan.FromSeconds(5), 10);