批处理要发送到 RX 的文件流读取器



我想逐行将文件的内容加载到流中,然后坐在那里每秒观察文件中的新条目 - 因此,实时文件读取器将输出通过管道传输到 RX。

我通过一次读取一行来实现这一点,如果 readline(( 上有数据,则通过传入的操作回调,该操作在调用者中将数据放在 RX 订阅者的重播主题上。

问题是这一次只在 RX 流上发回一行。我想将它们批处理起来,这样它就不会回调,直到您说出要发回的 10 个项目,或者已经过去了某个时间 - 例如 5-10 秒。

我的回调是一个数据的集合,现在我已经硬编码为只返回集合中的单个项目,因为我无法弄清楚如何进行基于时间的批处理。

谁能建议如何实现这一目标?

到目前为止我的代码

public void StartFileWatcher(Action<LogTailMessage[]> callbackAction, CancellationToken cancellationToken)
        {
            var wh = new AutoResetEvent(false);
            var fsw = new FileSystemWatcher(_path)
            {
                Filter = _file,
                EnableRaisingEvents = true
            };
            fsw.Changed += (s, e) => wh.Set();
            var lineNumber = 1;
            var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
            using (var sr = new StreamReader(fs))
            {
                while (!cancellationToken.IsCancellationRequested && !_isCancelled)
                {
                    var s = sr.ReadLine();
                    if (s != null)
                    {
                        //todo - batch these up so we only call back once we have 10 items, or if a certain amount of time has passed, send what we have
                        callbackAction(new [] {new LogTailMessage(lineNumber, s)});
                        lineNumber++;
                    }
                    else
                        wh.WaitOne(1000);
                }
            }
        }

更新:缓冲溶液

var watcherSubject = new ReplaySubject<LogTailMessage>();
            var watcher = new LogFileWatcher(path, filename);
            new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));
            Stream = watcherSubject
                .Buffer(TimeSpan.FromMilliseconds(500), 20)
                .Where(d => d != null)
                .Replay()
                .RefCount();

和文件观察程序

public void StartFileWatcher(Action<LogTailMessage> callbackAction, CancellationToken cancellationToken)
        {
            var wh = new AutoResetEvent(false);
            var fsw = new FileSystemWatcher(_path)
            {
                Filter = _file,
                EnableRaisingEvents = true
            };
            fsw.Changed += (s, e) => wh.Set();
            var fileName = Path.Combine(_path, _file);
            var startLine = GetFileStartLine(fileName);
            var lineNumber = 1;
            var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
            using (var sr = new StreamReader(fs))
            {
                while (!cancellationToken.IsCancellationRequested && !_isCancelled)
                {
                    var s = sr.ReadLine();
                    if (s != null)
                    {
                        if (lineNumber >= startLine)
                            callbackAction(new LogTailMessage(lineNumber, s));
                        lineNumber++;
                    }
                    else
                    {
                        wh.WaitOne(1000);
                    }
                }
            }
        }

你在原始代码中做了很多你真的不需要的工作,并且你正在创建未被清理的一次性物品和事件处理程序。

你真的可以在几个可观察量中完成整个事情。

首先,您需要观察文件中的更改。 方法如下:

IObservable<Unit> fileSystemWatcherChanges =
    Observable
        .Using(() =>
            new FileSystemWatcher(_path)
            {
                Filter = _file,
                EnableRaisingEvents = true
            },
            fsw =>
                Observable
                    .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                        h => fsw.Changed += h, h => fsw.Changed -= h)
                    .Select(x => Unit.Default));

现在,您需要打开一个流并在每次文件更改时从流中读取:

IObservable<LogTailMessage> messages =
    Observable
        .Using(
            () => new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite),
            fs =>
                Observable
                    .Using(
                        () => new StreamReader(fs),
                        sr =>
                            fileSystemWatcherChanges
                                .StartWith(Unit.Default)
                                .Select(x =>
                                    Observable
                                        .Defer(() => Observable.FromAsync(() => sr.ReadLineAsync()))
                                        .Repeat()
                                        .TakeUntil(w => w == null))
                                .Merge()
                                .Where(w => w != null)))
        .Select((x, n) => new LogTailMessage(n, x));
IObservable<IList<LogTailMessage>> buffered =
    messages
        .Buffer(TimeSpan.FromSeconds(5), 10);

我在计算机上对此进行了测试,我相信它提供了您需要的结果。

这是一个完整的 Rx 管道,所以如果你像IDisposable subscription = buffered.Subscribe();一样订阅,后来你打电话给subscription.Dispose();那么它都会自行清理。

它避免了主题。

您可以在该主题上使用Buffer

var subject = ReplaySubject<LogTailMessage>();
StartFileWatcher(a => a.ToList().ForEach(ltm => subject.OnNext(ltm)), CancellationToken.None);
bufferedSubject = subject.Buffer (TimeSpan.FromSeconds(5), 10);

相关内容

  • 没有找到相关文章

最新更新