作为警告,我是Rx的新手(2周(,并且一直在尝试使用Rx,RxUI和Roland Pheasant的DynamicData。
我有一个服务,它最初从本地持久性加载数据,然后根据某些用户(或系统(指令将联系服务器(示例中的 TriggerServer(以获取其他或替换数据。 我想出的解决方案使用主题,我遇到过许多讨论使用它们的优缺点的网站。 虽然我了解热/冷的基础知识,但它都是基于阅读而不是现实世界。
那么,使用以下内容作为简化版本,这是解决此问题的"正确"方法,还是我在某处没有正确理解某些内容?
注意:我不确定它有多重要,但实际代码取自Xamarin.Forms应用程序,该应用程序使用RxUI,用户输入是ReactiveCommand。
例:
using DynamicData;
using System;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
public class MyService : IDisposable
{
private CompositeDisposable _cleanup;
private Subject<Unit> _serverSubject = new Subject<Unit>();
public MyService()
{
var data = Initialise().Publish();
AllData = data.AsObservableCache();
_cleanup = new CompositeDisposable(AllData, data.Connect());
}
public IObservableCache<MyData, Guid> AllData { get; }
public void TriggerServer()
{
// This is what I'm not sure about...
_serverSubject.OnNext(Unit.Default);
}
private IObservable<IChangeSet<MyData, Guid>> Initialise()
{
return ObservableChangeSet.Create<MyData, Guid>(async cache =>
{
// inital load - is this okay?
cache.AddOrUpdate(await LoadLocalData());
// is this a valid way of doing this?
var sync = _serverSubject.Select(_ => GetDataFromServer())
.Subscribe(async task =>
{
var data = await task.ConfigureAwait(false);
cache.AddOrUpdate(data);
});
return new CompositeDisposable(sync);
}, d=> d.Id);
}
private IObservable<MyData> LoadLocalData()
{
return Observable.Timer(TimeSpan.FromSeconds(3)).Select(_ => new MyData("localdata"));
}
private async Task<MyData> GetDataFromServer()
{
await Task.Delay(2000).ConfigureAwait(true);
return new MyData("serverdata");
}
public void Dispose()
{
_cleanup?.Dispose();
}
}
public class MyData
{
public MyData(string value)
{
Value = value;
}
public Guid Id { get; } = Guid.NewGuid();
public string Value { get; set; }
}
以及一个简单的控制台应用程序来运行:
public static class TestProgram
{
public static void Main()
{
var service = new MyService();
service.AllData.Connect()
.Bind(out var myData)
.Subscribe(_=> Console.WriteLine("data in"), ()=> Console.WriteLine("COMPLETE"));
while (Continue())
{
Console.WriteLine("");
Console.WriteLine("");
Console.WriteLine($"Triggering Server Call, current data is: {string.Join(", ", myData.Select(x=> x.Value))}");
service.TriggerServer();
}
}
private static bool Continue()
{
Console.WriteLine("Press any key to call server, x to exit");
var key = Console.ReadKey();
return key.Key != ConsoleKey.X;
}
}
第一次尝试使用 Rx 看起来非常好
我建议进行一些更改:
1(从构造函数中删除Initialize()
调用并使其成为公共方法 - 对单元测试有很大帮助,现在如果需要,您可以await
它
public static void Main()
{
var service = new MyService();
service.Initialize();
2( 将Throttle
添加到触发器 - 这将修复对返回相同结果的服务器的并行调用
3(不要做任何可能造成Subscribe
的事情,改用Do
:
var sync = _serverSubject
.Throttle(Timespan.FromSeconds(0.5), RxApp.TaskPoolScheduler) // you can pass a scheduler via arguments, or use TestScheduler in unit tests to make time pass faster
.Do(async _ =>
{
var data = await GetDataFromServer().ConfigureAwait(false); // I just think this is more readable, your way was also correct
cache.AddOrUpdate(data);
})
// .Retry(); // or anything alese to handle failures
.Subscribe();
我把我得出的解决方案作为我的解决方案,以防万一有其他人在互联网上徘徊时发现了这一点。
我最终删除了所有主题并将几个 SourceCache 链接在一起,所以当一个更改时,它会推送到另一个,依此类推。 为了简洁起见,我删除了一些代码:
public class MyService : IDisposable
{
private SourceCache<MyData, Guid> _localCache = new SourceCache<MyData, Guid>(x=> x.Id);
private SourceCache<MyData, Guid> _serverCache = new SourceCache<MyData, Guid>(x=> x.Id);
public MyService()
{
var localdata = _localCache.Connect();
var serverdata = _serverCache.Connect();
var alldata = localdata.Merge(serverdata);
AllData = alldata.AsObservableCache();
}
public IObservableCache<MyData, Guid> AllData { get; }
public IObservable<Unit> TriggerLocal()
{
return LoadLocalAsync().ToObservable();
}
public IObservable<Unit> TriggerServer()
{
return LoadServerAsync().ToObservable();
}
}
编辑:我再次更改了它以删除任何缓存链 - 我只是在内部管理一个缓存。 教训是不要太早发布。