我有一个IObservable<string>
,它包含XML文档(的片段)。我想把一个变成另一个。例如,假设我有以下从IObservable<string>
推送的片段(每行包含一个片段):
<?xml version=
"1.0" ?>
<testXml></test
Xml><?xml version="1.0"
?><otherXml /><?xm
如何将其变形为IObservable<XDocument>
以获得以下文档:
<?xml version="1.0"?><testXml />
<?xml version="1.0"?><otherXml />
我一直在考虑将IObservable<string>
交给一些阻塞的TextReader
实现,但我认为应该有一个更聪明的解决方案。
这个怎么样:
IObservable<string> splitXmlTokensIntoSeparateLines(string s)
{
// Here, you need to split tokens into separate lines (where 'token'
// is the beginning of an Xml element). This makes it easier down
// the line for the TakeWhile operator.
return new[] { firstPart, secondPart, etc }.ToObservable();
}
bool doesTokenTerminateDocument(string s)
{
// Here, you should return whether the XML represents the end of one
// document
}
var xmlDocuments = stringObservable
.SelectMany(x => splitXmlTokensIntoSeparateLines(x))
.TakeWhile(x => doesTokenTerminateDocument(x))
.Aggregate(new StringBuilder(), (acc, x), acc.Append(x))
.Select(x => {
var ret = new XDocument();
ret.Parse(x.ToString());
return ret;
})
.Repeat()
.TakeUntil(stringObservable.Aggregate(0, (acc, _) => acc));
TakeUntil是一个让它正确终止的黑客——基本上,Repeat会永远保持重新订阅,除非我们告诉它在stringObservable完成时完成。
为什么不能使用Select
运算符?
var xmlObs = stringObs.Select(s => {
var x = new XDocument();
x.Parse(s);
return x;
});