Rx.Net-Publish方法在订阅Cold Observable时缺少前几项



受Akavache的启发,我正试图创建一个为我提供IObservable<IArticle>的解决方案。该方法本质上首先尝试获取数据库中存在的所有文章,然后尝试从Web服务获取更新的文章,并且在从Web服务中获取最新文章时,尝试将它们保存回数据库。

由于Web服务本质上是一个冷可观察的服务,我不想订阅两次,所以我使用了Publish来连接它。我的理解是,我使用的是Publish方法的正确版本,然而,很多时候该方法往往会错过GetNewsArticles中的前几篇文章。这是通过UI以及下面调用中添加的Trace调用观察到的。

除了解决这个问题外,还可以了解如何调试/测试此代码(除了引入DI来注入NewsService(。

public IObservable<IArticle> GetContents(string newsUrl, IScheduler scheduler)
{
var newsService = new NewsService(new HttpClient());
scheduler = scheduler ?? TaskPoolScheduler.Default;
var fetchObject = newsService
.GetNewsArticles(newsUrl,scheduler)
.Do(x => Trace.WriteLine($"Parsing Articles {x.Title}"));
return fetchObject.Publish(fetchSubject =>
{
var updateObs = fetchSubject
.Do( x =>                         
{
// Save to database, all sync calls
})
.Where(x => false)
.Catch(Observable.Empty<Article>());
var dbArticleObs = Observable.Create<IArticle>(o =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) =>
{
using (var session = dataBase.GetSession())
{
var articles = await session.GetArticlesAsync(newsUrl, ct);
foreach (var article in articles)
{
o.OnNext(article);
}
}
o.OnCompleted();
});
});
return
dbArticleObs                // First get all the articles from dataBase cache
.Concat(fetchSubject    // Get the latest articles from web service 
.Catch(Observable.Empty<Article>())
.Merge(updateObs))  // Update the database with latest articles
.Do(x => Trace.WriteLine($"Displaying {x.Title}"));
});
}

更新-添加GetArticles

public IObservable<IContent> GetArticles(string feedUrl, IScheduler scheduler)
{
return Observable.Create<IContent>(o =>
{
scheduler = scheduler ?? DefaultScheduler.Instance;
scheduler.ScheduleAsync(async (ctrl, ct) =>
{
try
{
using (var inputStream = await Client.GetStreamAsync(feedUrl))
{
var settings = new XmlReaderSettings
{
IgnoreComments = true,
IgnoreProcessingInstructions = true,
IgnoreWhitespace = true,
Async = true
};
//var parsingState = ParsingState.Channel;
Article article = null;
Feed feed = null;
using (var reader = XmlReader.Create(inputStream, settings))
{
while (await reader.ReadAsync())
{
ct.ThrowIfCancellationRequested();
if (reader.IsStartElement())
{
switch (reader.LocalName)
{
...
// parsing logic goes here
...
}
}
else if (reader.LocalName == "item" &&
reader.NodeType == XmlNodeType.EndElement)
{
o.OnNext(article);
}
}
}
o.OnCompleted();
}
}
catch (Exception e)
{
o.OnError(e);
}
});
return Disposable.Empty;
});
}
UPDATE 2

在此处共享源代码链接。

我不喜欢您的代码。我假设NewsServiceIDisposable,因为它需要HttpClient(它是一次性的(。你没有进行适当的清理。

此外,您还没有提供一个完整的方法——因为您已经尝试过为这个问题减少它——但这使得很难对如何重写代码进行推理。

也就是说,有一件事让我觉得很可怕,那就是Observable.Create。你能试一下这个代码吗?看看它是否有助于你的工作?

var dbArticleObs =
Observable
.Using(
() => dataBase.GetSession(),
session =>
from articles in Observable.FromAsync(ct => session.GetArticlesAsync(newsUrl, ct))
from article in articles
select article);

现在,如果是这样,请尝试在新建`NewService时重写fetchObject以使用相同的Observable.Using

无论如何,如果您能在问题中提供GetContentsNewsServicedataBase代码的完整实现,那就太好了。

相关内容

最新更新