我有两个数据源:在线和离线(缓存)。它们都返回对象的IObservable,该对象包含两个标志——IsSuccess和IsCached。我想从在线源获取数据,但仅当IsSuccess=true时。如果失败,我想从脱机源获取数据。此外,我想在缓存中保存新数据以备将来使用。我不知道如何在RX中做到最好。这是我的实现,但我认为它可以做得更好
public IObservable<Result<SampleModel>> GetSampleModel()
{
IObservable<Result<SampleModel>> onlineObservable = _onlineSource.GetData<SampleModel>();
IObservable<Result<SampleModel>> offlineObservable = _offlineSource.GetData<SampleModel>();
var subject = new Subject<Result<SampleModel>>();
onlineObservable.Do(async (result) =>
{
if (result.IsSuccess)
{
await _offlineSource.CacheData(result.Data).ConfigureAwait(false);
}
}).Subscribe((result) =>
{
if (result.IsSuccess)
{
subject.OnNext(result);
}
subject.OnCompleted();
});
return subject.Concat(offlineObservable).Take(1);
}
结果类-数据包装器:
public class Result<T>
{
public Result(Exception exception)
{
Exception = exception;
}
public Result(T data, bool isCached = false)
{
IsCached = isCached;
IsSuccess = true;
Data = data;
}
public bool IsSuccess { get; private set; }
public bool IsCached { get; private set; }
public T Data { get; private set; }
public Exception Exception { get; private set; }
}
您的实现将无法可靠地工作,因为其中存在竞争条件。考虑一下:
var temp = GetSampleModel(); // #1
// Do some long operation here
temp.Subscribe(p => Console.WriteLine(p)); // #2
在这种情况下,获取数据将从#1开始,如果数据在#2执行之前被接收并推送到主题,那么无论等待多长时间,都不会打印任何内容。
通常,您应该避免在返回IObservable
的函数内部订阅,以避免出现此类问题。使用Do
也是一种臭味。您可以使用ReplaySubject
或AsyncSubject
修复代码,但在这种情况下,我通常更喜欢Observable.Create
。这是我的重写:
public IObservable<SampleModel> GetSampleModel(IScheduler scheduler = null)
{
scheduler = scheduler ?? TaskPoolScheduler.Default;
return Observable.Create<SampleModel>(observer =>
{
return scheduler.ScheduleAsync(async (s, ct) =>
{
var onlineResult = await _onlineSource.GetData<SampleModel>().FirstAsync();
if (onlineResult.IsSuccess)
{
observer.OnNext(onlineResult.Data);
await _offlineSource.CacheData(onlineResult.Data);
observer.OnCompleted();
}
else
{
var offlineResult = await _offlineSource.GetData<SampleModel>().FirstAsync();
if (offlineResult.IsSuccess)
{
observer.OnNext(offlineResult.Data);
observer.OnCompleted();
}
else
{
observer.OnError(new Exception("Could not receive model"));
}
}
return Disposable.Empty;
});
});
}
你可以看到它仍然不是很漂亮。我认为这是因为您选择不使用处理错误的自然Rx系统,而是使用Result
类型包装您的值。如果您更改存储库方法以Rx方式处理错误,则生成的代码会更加简洁。(请注意,我将您的Result
类型更改为MaybeCached
,并且我假设现在两个源都返回IObservable<SampleModel>
,这是一个冷可观察的结果,返回单个结果或错误):
public class MaybeCached<T>
{
public MaybeCached(T data, bool isCached)
{
IsCached = isCached;
IsSuccess = true;
}
public bool IsCached { get; private set; }
public T Data { get; private set; }
}
public IObservable<SampleModel> GetSampleModel()
{
_onlineSource
.GetData<SampleModel>()
.Select(d => new MaybeCached(d, false))
.Catch(_offlineSource
.GetData<SampleModel>()
.Select(d => new MaybeCached(d, true))
.SelectMany(data => data.IsCached ? Observable.Return(data.Data) : _offlineSource.CacheData(data.Data).Select(_ => data.Data));
}
此处使用Catch
是为了获得您要求的条件切换。