Rx-反应扩展-从第一个可观测到第二个的条件切换



我有两个数据源:在线和离线(缓存)。它们都返回对象的IObservable,该对象包含两个标志——IsSuccess和IsCached。我想从在线源获取数据,但仅当IsSuccess=true时。如果失败,我想从脱机源获取数据。此外,我想在缓存中保存新数据以备将来使用。我不知道如何在RX中做到最好。这是我的实现,但我认为它可以做得更好

public IObservable<Result<SampleModel>> GetSampleModel()
    {
        IObservable<Result<SampleModel>> onlineObservable = _onlineSource.GetData<SampleModel>();
        IObservable<Result<SampleModel>> offlineObservable = _offlineSource.GetData<SampleModel>();
        var subject = new Subject<Result<SampleModel>>();
        onlineObservable.Do(async (result) =>
        {
            if (result.IsSuccess)
            {
                await _offlineSource.CacheData(result.Data).ConfigureAwait(false);
            }
        }).Subscribe((result) =>
        {
            if (result.IsSuccess)
            {
                subject.OnNext(result);
            }
            subject.OnCompleted();
        });
        return subject.Concat(offlineObservable).Take(1);
    }

结果类-数据包装器:

public class Result<T>
{
    public Result(Exception exception)
    {
        Exception = exception;
    }
    public Result(T data, bool isCached = false)
    {
        IsCached = isCached;
        IsSuccess = true;
        Data = data;
    }
    public bool IsSuccess { get; private set; }
    public bool IsCached { get; private set; }
    public T Data { get; private set; }
    public Exception Exception { get; private set; }
}

您的实现将无法可靠地工作,因为其中存在竞争条件。考虑一下:

var temp = GetSampleModel(); // #1
// Do some long operation here
temp.Subscribe(p => Console.WriteLine(p)); // #2

在这种情况下,获取数据将从#1开始,如果数据在#2执行之前被接收并推送到主题,那么无论等待多长时间,都不会打印任何内容。

通常,您应该避免在返回IObservable的函数内部订阅,以避免出现此类问题。使用Do也是一种臭味。您可以使用ReplaySubjectAsyncSubject修复代码,但在这种情况下,我通常更喜欢Observable.Create。这是我的重写:

public IObservable<SampleModel> GetSampleModel(IScheduler scheduler = null)
{
    scheduler = scheduler ?? TaskPoolScheduler.Default;
    return Observable.Create<SampleModel>(observer =>
    {
        return scheduler.ScheduleAsync(async (s, ct) =>
        {
            var onlineResult = await _onlineSource.GetData<SampleModel>().FirstAsync();
            if (onlineResult.IsSuccess)
            {
                observer.OnNext(onlineResult.Data);
                await _offlineSource.CacheData(onlineResult.Data);
                observer.OnCompleted();
            }
            else
            {
                var offlineResult = await _offlineSource.GetData<SampleModel>().FirstAsync();
                if (offlineResult.IsSuccess)
                {
                    observer.OnNext(offlineResult.Data);
                    observer.OnCompleted();
                }
                else
                {
                    observer.OnError(new Exception("Could not receive model"));
                }
            }
            return Disposable.Empty;
        });
    });
}

你可以看到它仍然不是很漂亮。我认为这是因为您选择不使用处理错误的自然Rx系统,而是使用Result类型包装您的值。如果您更改存储库方法以Rx方式处理错误,则生成的代码会更加简洁。(请注意,我将您的Result类型更改为MaybeCached,并且我假设现在两个源都返回IObservable<SampleModel>,这是一个冷可观察的结果,返回单个结果或错误):

public class MaybeCached<T>
{
    public MaybeCached(T data, bool isCached)
    {
        IsCached = isCached;
        IsSuccess = true;
    }
    public bool IsCached { get; private set; }
    public T Data { get; private set; }
}
public IObservable<SampleModel> GetSampleModel()
{
    _onlineSource
        .GetData<SampleModel>()
        .Select(d => new MaybeCached(d, false))
        .Catch(_offlineSource
                    .GetData<SampleModel>()
                    .Select(d => new MaybeCached(d, true))
        .SelectMany(data => data.IsCached ? Observable.Return(data.Data) : _offlineSource.CacheData(data.Data).Select(_ => data.Data));
}

此处使用Catch是为了获得您要求的条件切换。

相关内容

  • 没有找到相关文章

最新更新