受Akavache的启发,我正试图创建一个为我提供IObservable<IArticle>
的解决方案。该方法本质上首先尝试获取数据库中存在的所有文章,然后尝试从Web服务获取更新的文章,并且在从Web服务中获取最新文章时,尝试将它们保存回数据库。
由于Web服务本质上是一个冷可观察的服务,我不想订阅两次,所以我使用了Publish
来连接它。我的理解是,我使用的是Publish
方法的正确版本,然而,很多时候该方法往往会错过GetNewsArticles
中的前几篇文章。这是通过UI以及下面调用中添加的Trace
调用观察到的。
除了解决这个问题外,还可以了解如何调试/测试此代码(除了引入DI来注入NewsService
(。
public IObservable<IArticle> GetContents(string newsUrl, IScheduler scheduler)
{
var newsService = new NewsService(new HttpClient());
scheduler = scheduler ?? TaskPoolScheduler.Default;
var fetchObject = newsService
.GetNewsArticles(newsUrl,scheduler)
.Do(x => Trace.WriteLine($"Parsing Articles {x.Title}"));
return fetchObject.Publish(fetchSubject =>
{
var updateObs = fetchSubject
.Do( x =>
{
// Save to database, all sync calls
})
.Where(x => false)
.Catch(Observable.Empty<Article>());
var dbArticleObs = Observable.Create<IArticle>(o =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) =>
{
using (var session = dataBase.GetSession())
{
var articles = await session.GetArticlesAsync(newsUrl, ct);
foreach (var article in articles)
{
o.OnNext(article);
}
}
o.OnCompleted();
});
});
return
dbArticleObs // First get all the articles from dataBase cache
.Concat(fetchSubject // Get the latest articles from web service
.Catch(Observable.Empty<Article>())
.Merge(updateObs)) // Update the database with latest articles
.Do(x => Trace.WriteLine($"Displaying {x.Title}"));
});
}
更新-添加GetArticles
public IObservable<IContent> GetArticles(string feedUrl, IScheduler scheduler)
{
return Observable.Create<IContent>(o =>
{
scheduler = scheduler ?? DefaultScheduler.Instance;
scheduler.ScheduleAsync(async (ctrl, ct) =>
{
try
{
using (var inputStream = await Client.GetStreamAsync(feedUrl))
{
var settings = new XmlReaderSettings
{
IgnoreComments = true,
IgnoreProcessingInstructions = true,
IgnoreWhitespace = true,
Async = true
};
//var parsingState = ParsingState.Channel;
Article article = null;
Feed feed = null;
using (var reader = XmlReader.Create(inputStream, settings))
{
while (await reader.ReadAsync())
{
ct.ThrowIfCancellationRequested();
if (reader.IsStartElement())
{
switch (reader.LocalName)
{
...
// parsing logic goes here
...
}
}
else if (reader.LocalName == "item" &&
reader.NodeType == XmlNodeType.EndElement)
{
o.OnNext(article);
}
}
}
o.OnCompleted();
}
}
catch (Exception e)
{
o.OnError(e);
}
});
return Disposable.Empty;
});
}
UPDATE 2在此处共享源代码链接。
我不喜欢您的代码。我假设NewsService
是IDisposable
,因为它需要HttpClient
(它是一次性的(。你没有进行适当的清理。
此外,您还没有提供一个完整的方法——因为您已经尝试过为这个问题减少它——但这使得很难对如何重写代码进行推理。
也就是说,有一件事让我觉得很可怕,那就是Observable.Create
。你能试一下这个代码吗?看看它是否有助于你的工作?
var dbArticleObs =
Observable
.Using(
() => dataBase.GetSession(),
session =>
from articles in Observable.FromAsync(ct => session.GetArticlesAsync(newsUrl, ct))
from article in articles
select article);
现在,如果是这样,请尝试在新建`NewService时重写fetchObject
以使用相同的Observable.Using
。
无论如何,如果您能在问题中提供GetContents
、NewsService
和dataBase
代码的完整实现,那就太好了。