将字符串的IOobservable转换为XDocuments的IOobervable

本文关键字:XDocuments IOobervable IOobservable 字符串 转换 | 更新日期: 2023-09-27 17:58:30

我有一个IObservable<string>,它包含XML文档(的片段)。我想把一个变成另一个。例如,假设我有以下从IObservable<string>推送的片段(每行包含一个片段):

<?xml version=
"1.0" ?>
<testXml></test
Xml><?xml version="1.0"
?><otherXml /><?xm

如何将其变形为IObservable<XDocument>以获得以下文档:

<?xml version="1.0"?><testXml />
<?xml version="1.0"?><otherXml />

我一直在考虑将IObservable<string>交给一些阻塞的TextReader实现,但我认为应该有一个更聪明的解决方案。

将字符串的IOobservable转换为XDocuments的IOobervable

这个怎么样:

IObservable<string> splitXmlTokensIntoSeparateLines(string s)
{
    // Here, you need to split tokens into separate lines (where 'token'
    // is the beginning of an Xml element). This makes it easier down
    // the line for the TakeWhile operator.
    return new[] { firstPart, secondPart, etc }.ToObservable();
}
bool doesTokenTerminateDocument(string s)
{
    // Here, you should return whether the XML represents the end of one 
    // document
}
var xmlDocuments = stringObservable
    .SelectMany(x => splitXmlTokensIntoSeparateLines(x))
    .TakeWhile(x => doesTokenTerminateDocument(x))
    .Aggregate(new StringBuilder(), (acc, x), acc.Append(x))
    .Select(x => {
        var ret = new XDocument();
        ret.Parse(x.ToString());
        return ret;
    })
    .Repeat()
    .TakeUntil(stringObservable.Aggregate(0, (acc, _) => acc));

TakeUntil是一个让它正确终止的黑客——基本上,Repeat会永远保持重新订阅,除非我们告诉它在stringObservable完成时完成。

为什么不能使用Select运算符?

var xmlObs = stringObs.Select(s => {
  var x = new XDocument();
  x.Parse(s);
  return x;
});