我已经生成/测试了两个可观察对象来组合执行单个查询。
一个用户可以拥有多个角色。每当角色id改变时,数据就需要更新。但是,只有当查询处于活动状态(当前有一些控件需要数据)时,数据才应该更新。
在挂起查询时也可能发生角色Id更改。当查询再次激活时,数据也应该加载。
//Tuple has the Id of the current Role and the time that the Id updated
IObservable<Tuple<Guid, DateTime>> idUpdate
//Tuple has the state of the query (true=active or false=suspended)
//and the time the state of the query updated
IObservable<Tuple<bool, DateTime>> queryStateUpdate
我想创建
//A hot observable that pushes true whenever the query should execute
IObservable<bool> execute
我把它分成两个可以合并的case,但是我不知道如何创建case observable
- 情况a)角色Id更新&最后一个状态是Active
- 情况b)状态更新为有源&&这是角色Id更新后的第一个活动状态
我已经看过视频,李坎贝尔网站,初学者TOC等,但我似乎找不到一个很好的例子,这个rx加入。关于如何创建执行或案例可观察对象有什么想法吗?
鉴于所描述的问题-这有点模糊,因为我没有看到实际id (Guid
)用于什么,也没有DateTime
值-我有以下查询似乎可以解决您的问题:
IObservable<bool> execute =
idUpdate
.Publish(_idUpdate =>
from qsu in queryStateUpdate
select qsu.Item1
? _idUpdate.Select(x => true)
: Observable.Empty<bool>())
.Switch();
我已经用以下idUpdate
&queryStateUpdate
可见。
var rnd = new Random();
IObservable<Tuple<Guid, DateTime>> idUpdate =
Observable
.Generate(
0,
n => n < 10000,
n => n + 1,
n => Tuple.Create(Guid.NewGuid(), DateTime.Now),
n => TimeSpan.FromSeconds(rnd.NextDouble() * 0.1));
IObservable<Tuple<bool, DateTime>> queryStateUpdate =
Observable
.Generate(
0,
n => n < 100,
n => n + 1,
n => n % 2 == 0,
n => TimeSpan.FromSeconds(rnd.NextDouble() * 2.0))
.StartWith(true)
.DistinctUntilChanged()
.Select(b => Tuple.Create(b, DateTime.Now));
如果你能澄清一下你的问题,我可能会提供一个更好的答案来满足你的需要。
编辑:增加了当Id在非活动时发生变化时所需的"replay(1)"行为。
请注意,我已经摆脱了使用DateTime
的元组的需要。
IObservable<Guid> idUpdate = ...
IObservable<bool> queryStateUpdate = ...
var replay = new ReplaySubject<Guid>(1);
var disposer = new SerialDisposable();
Func<bool, IObservable<bool>, IObservable<Guid>,
IObservable<Guid>> getSwitch = (qsu, qsus, iu) =>
{
if (qsu)
{
return replay.Merge(iu);
}
else
{
replay.Dispose();
replay = new ReplaySubject<Guid>(1);
disposer.Disposable = iu.TakeUntil(qsus).Subscribe(replay);
return Observable.Empty<Guid>();
}
};
var query =
queryStateUpdate
.DistinctUntilChanged()
.Publish(qsus =>
idUpdate
.Publish(ius =>
qsus
.Select(qsu =>
getSwitch(qsu, qsus, ius))))
.Switch();
我读到的问题是说有一个通知idUpdate
流,只要设置了queryStateUpdate
,就会处理。如果没有设置queryStateUpdate
,则通知应该暂停,直到再次设置queryStateUpdate
。
在这种情况下,连接运算符不能解决您的问题。
我建议你在queryStateUpdate
未设置时需要某种形式的缓存,即
List<Tuple<Guid,DateTime>> cache = new List<Tuple<Guid,DateTime>>();
Subject<Tuple<Guid,DateTime>> execute = new Subject<Tuple<Guid,DateTime>>();
idUpdate.Subscribe( x => {
if (queryStateUpdate.Last().Item1) //might be missing something here with Last, you might need to copy the state out
exeucte.OnNext(x);
else
cache.Add(x);
});
queryStateUpdate.Subscribe(x=> {
if (x.Item1)
{
//needs threadsafety
foreach(var x in cache)
execute.OnNext(x);
cache.Clear();
});
感谢Enigmativity和AlSki。使用缓存,我找到了答案。
var execute = new Subject<Guid>();
var cache = new Stack<Guid>();
idUpdate.CombineLatest(queryStateUpdate, (id, qs) => new { id, qs }).Subscribe( anon =>
{
var id = anon.id;
var queryState = anon.qs;
//The roleId updated after the queryState updated
if (id.Item2 > queryState.Item2)
{
//If the queryState is active, call execute
if (observationState.Item1)
{
cache.Clear();
execute.OnNext(roleId.Item1);
return;
}
//If the id updated and the state is suspended, cache it
cache.Push(id.Item1);
}
//The queryState updated after the roleId
else if (queryState.Item2 > roleId.Item2)
{
//If the queryState is active and a roleId update has been cached, call execute
if (queryState.Item1 && cache.Count > 0)
{
execute.OnNext(cache.Pop());
cache.Clear();
}
}});