我有一个可观察的查询,它从我想内联解析的流中产生一个IObservable<byte>
。我希望能够根据数据源使用不同的策略来解析来自该序列的离散消息。请记住,我仍然处于RX的向上学习曲线上。我已经想出了一个解决方案,但不确定是否有一种方法可以使用开箱即用的操作符来完成这个任务。
public static IObservable<IList<T>> Parse<T>(
this IObservable<T> source,
Func<IObservable<T>, IObservable<IList<T>>> parsingFunction)
{
return parsingFunction(source);
}
这允许我指定特定数据源所使用的消息框架策略。一个数据源可能由一个或多个字节分隔,而另一个数据源可能由开始和停止块模式分隔,而另一个数据源可能使用长度前缀策略。下面是我定义的一个分隔策略的例子:
public static class MessageParsingFunctions
{
public static Func<IObservable<T>, IObservable<IList<T>>> Delimited<T>(T[] delimiter)
{
if (delimiter == null) throw new ArgumentNullException("delimiter");
if (delimiter.Length < 1) throw new ArgumentException("delimiter must contain at least one element.");
Func<IObservable<T>, IObservable<IList<T>>> parser =
(source) =>
{
var shared = source.Publish().RefCount();
var windowOpen = shared.Buffer(delimiter.Length, 1)
.Where(buffer => buffer.SequenceEqual(delimiter))
.Publish()
.RefCount();
return shared.Buffer(windowOpen)
.Select(bytes =>
bytes
.Take(bytes.Count - delimiter.Length)
.ToList());
};
return parser;
}
}
因此,最终,作为一个例子,我可以以以下方式使用代码来解析序列中任何时候遇到字符串'<EOF>'
的字节模式的离散消息:
var messages = ...operators that surface an IObservable<byte>
.Parse(MessageParsingFunctions.Delimited(Encoding.ASCII.GetBytes("<EOF>")))
...further operators to package discrete messages along with additional metadata
问题:
- 是否有一种更直接的方法来完成这个使用开箱操作符?
- 如果不是,最好只是定义不同的解析函数(即ParseDelimited, ParseLengthPrefixed等)作为本地扩展,而不是有一个更通用的Parse扩展方法,接受解析函数?
提前感谢!
看一下Rxx解析器。这是一个相关的实验室。例如:
IObservable<byte> bytes = ...;
var parsed = bytes.ParseBinary(parser =>
from next in parser
let magicNumber = parser.String(Encoding.UTF8, 3).Where(value => value == "RXX")
let header = from headerLength in parser.Int32
from header in next.Exactly(headerLength)
from headerAsString in header.Aggregate(string.Empty, (s, b) => s + " " + b)
select headerAsString
let message = parser.String(Encoding.UTF8)
let entry = from length in parser.Int32
from data in next.Exactly(length)
from value in data.Aggregate(string.Empty, (s, b) => s + " " + b)
select value
let entries = from count in parser.Int32
from entries in entry.Exactly(count).ToList()
select entries
select from _ in magicNumber.Required("The file's magic number is invalid.")
from h in header.Required("The file's header is invalid.")
from m in message.Required("The file's message is invalid.")
from e in entries.Required("The file's data is invalid.")
select new
{
Header = h,
Message = m,
Entries = e.Aggregate(string.Empty, (acc, cur) => acc + cur + Environment.NewLine)
});