我有一个可观察的监视对象,该日志也在不断写入。每行都是一个新的下一个调用。有时,日志会跨多行输出单个日志项。检测这很容易,我只是找不到正确的 RX 调用。
我想找到一种方法将单个日志项收集到行列表中,并在单个日志项完成后在下一个列表中。
Buffer
似乎不对,因为这不是基于时间的,而是基于算法的。
GroupBy
可能是我想要的,但文档对此感到困惑。似乎它创建的可观察量在源可观察量完成之前可能不会调用 onComplete。
此解决方案不会延迟日志太多(最好完全不延迟)。我需要尽可能接近实时地阅读日志,订单很重要。
任何朝着正确方向的推动都会很棒。
这是一个典型的反应式解析问题。您可以使用 Rxx 解析器,或者对于本机解决方案,您可以使用 Scan
或通过定义异步迭代器来构建自己的状态机。 Scan
对于简单的解析器更可取,并且通常使用Scan-Where-Select
模式。
异步迭代器状态机示例:十字转门
Scan
解析器示例(未经测试):
IObservable<string> lines = ReadLines();
IObservable<IReadOnlyList<string>> parsed = lines.Scan(
new
{
ParsingItem = (IEnumerable<string>)null,
Item = (IEnumerable<string>)null
},
(state, line) =>
// I'm assuming here that items never span lines partially.
IsItem(line)
? IsItemLastLine(line)
? new
{
ParsingItem = (IEnumerable<string>)null,
Item = (state.ParsingItem ?? Enumerable.Empty<string>()).Concat(line)
}
: new
{
ParsingItem = (state.ParsingItem ?? Enumerable.Empty<string>()).Concat(line),
Item = (List<string>)null
}
: new
{
ParsingItem = (IEnumerable<string>)null,
Item = new[] { line }
})
.Where(result => result.Item != null)
.Select(result => result.Item.ToList().AsReadOnly());