合并流的各个部分



我有一个可观察的监视对象,该日志也在不断写入。每行都是一个新的下一个调用。有时,日志会跨多行输出单个日志项。检测这很容易,我只是找不到正确的 RX 调用。

我想找到一种方法将单个日志项收集到行列表中,并在单个日志项完成后在下一个列表中。

Buffer似乎不对,因为这不是基于时间的,而是基于算法的。

GroupBy可能是我想要的,但文档对此感到困惑。似乎它创建的可观察量在源可观察量完成之前可能不会调用 onComplete。

此解决方案不会延迟日志太多(最好完全不延迟)。我需要尽可能接近实时地阅读日志,订单很重要。

任何朝着正确方向的推动都会很棒。

这是一个典型的反应式解析问题。您可以使用 Rxx 解析器,或者对于本机解决方案,您可以使用 Scan 或通过定义异步迭代器来构建自己的状态机。 Scan 对于简单的解析器更可取,并且通常使用Scan-Where-Select模式。

异步迭代器状态机示例:十字转门

Scan解析器示例(未经测试):

IObservable<string> lines = ReadLines();
IObservable<IReadOnlyList<string>> parsed = lines.Scan(
  new
  {
    ParsingItem = (IEnumerable<string>)null,
    Item = (IEnumerable<string>)null
  },
  (state, line) =>
    // I'm assuming here that items never span lines partially.
    IsItem(line)
    ? IsItemLastLine(line)
      ? new
        {
          ParsingItem = (IEnumerable<string>)null,
          Item = (state.ParsingItem ?? Enumerable.Empty<string>()).Concat(line)
        }
      : new
        {
          ParsingItem = (state.ParsingItem ?? Enumerable.Empty<string>()).Concat(line),
          Item = (List<string>)null
        }
    : new
      {
        ParsingItem = (IEnumerable<string>)null,
        Item = new[] { line }
      })
  .Where(result => result.Item != null)
  .Select(result => result.Item.ToList().AsReadOnly());

相关内容

  • 没有找到相关文章

最新更新