我得到了一个可观察的,我想将它们分组。以前,但是我想要最新的值,而不是获取钥匙。我也想要更新,跳过最新值。
在此示例中,为了使其更简单,我们只能与Modulo结果相等。但是我希望我的解决方案适用于所有内容。
示例:
-------- 15 ----- 25 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
让我们在上一个示例上添加一些时间点,我们将订阅:
--- S1 ------ 15--S2 --- 25 -----------------...
预期的结果是:
s1接收最新:15,更新:可观察的以25开始,更新%10 == 5,以后到达。说明:15到达后将通知S1,15是最新的元素,因此我想立即将其避开。第二个论点将是可观察的,将来会产生以后的25,而将来的%10 == 5个元素。
s2接收最新:15,更新:可观察的以25开始,更新%10 == 5,将稍后到达。说明:S2将在订阅中通知,15是最新的元素,所以我想立即将其删除。第二个论点将是可观察的,将来会产生以后的25,而将来的%10 == 5个元素。
S3接收最新:25,更新:可观察到的更新%10 == 5,以后到达。说明:S3将在订阅中通知,25是最新元素,所以我想立即将其放置。第二个论点将是可观察的,将来会产生%10 == 5个元素。
这是一些分辨率的尝试:
下面的代码使用元组和Nunit。
首次尝试
[Test]
public void WhenWeGroupByReplaying1()
{
var subject = new Subject<uint>();
var observable = subject.GroupBy(t => t%10)
.Select(t =>
{
var connectableObservable = t.Replay(1);
connectableObservable.Connect();
return (key: t.Key, updates: connectableObservable);
}).Replay();
observable.Connect();
// I will block on the First of the lambda below
var getLastAndUpdates = observable
.Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[1] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
});
subject.OnNext(15);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[2] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
});
subject.OnNext(25);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[3] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
});
}
该解决方案将被评论中所示。
第二次尝试
[Test]
public void WhenWeGroupByReplaying2()
{
var subject = new Subject<uint>();
var observable = subject.GroupBy(t => t, t => t, new ModuloEqualityComparer())
.Select(t =>
{
var connectableObservable = t.Publish(t.Key);
connectableObservable.Connect();
return (key: t.Key, updates: connectableObservable);
}).Replay();
observable.Connect();
var getLastAndUpdates = observable
.Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[1] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
});
subject.OnNext(15);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[2] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
});
subject.OnNext(25);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[3] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
});
}
private class ModuloEqualityComparer : IEqualityComparer<uint>
{
public bool Equals(uint x, uint y)
{
return x % 10 == y % 10;
}
public int GetHashCode(uint obj)
{
return (obj % 10).GetHashCode();
}
}
结果:
[1] - FIRST: 15
[1] - UPDATE: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25
预期结果:(确切的订单不含用(
[1] - FIRST: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25
第三尝试
[Test]
public void WhenWeGroupByReplaying3()
{
var subject = new Subject<uint>();
var observable = subject.GroupBy(t => (key: t%10, value:t), t => t, new ModuloEqualityComparer2())
.Select(t =>
{
var connectableObservable = t.Publish(t.Key.Item2);
connectableObservable.Connect();
return (key: t.Key, updates: connectableObservable);
}).Replay();
observable.Connect();
var getLastAndUpdates = observable
.Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[1] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
});
subject.OnNext(15);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[2] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
});
subject.OnNext(25);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[3] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
});
}
private class ModuloEqualityComparer2 : IEqualityComparer<(uint,uint)>
{
private readonly ModuloEqualityComparer _moduloEqualityComparer = new ModuloEqualityComparer();
public bool Equals((uint, uint) x, (uint, uint) y)
{
return _moduloEqualityComparer.Equals(x.Item1, y.Item1);
}
public int GetHashCode((uint, uint) obj)
{
return _moduloEqualityComparer.GetHashCode(obj.Item1);
}
}
结果:
[1] - FIRST: 15
[1] - UPDATE: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25
预期结果:(确切的订单不含用(
[1] - FIRST: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25
感谢您的阅读。
我不完全确定您要实现的目标,但希望这会帮助您:
您的代码有几个问题:
-
.First()
是有原因的。您不应该使用RX
使用阻止代码 -
.Replay()
需要虚拟订阅才能正常工作。我不确定这是否是困扰您的代码,但要实现您的目标,您想要那个。 - 嵌套订阅通常是一个坏主意。我已经用
.Merge()
替换了嵌套的订阅。
如果这不能解决您的问题,我建议修改您的问题,以描述您尝试使用RX完成的工作。这有点像XY的情况。
这是代码:
var subject = new Subject<uint>();
var observable = subject.GroupBy(t => t % 10)
.Select(t => t.Replay(1).RefCount()).Replay().RefCount();
// dummy subscriptions required for Replay to work correctly.
var dummySub = observable.Merge().Subscribe();
observable
.Select(o => o.Select((t, index) => (t.Key, t.num, index)))
.Merge()
.Subscribe(t =>
{
if (t.index == 0)
Console.WriteLine($"[1] - FIRST: {t.num}");
else
Console.WriteLine($"[1] - UPDATE: {t.num}");
});
subject.OnNext(15);
observable
.Select(o => o.Select((t, index) => (t.Key, t.num, index)))
.Merge()
.Subscribe(t =>
{
if (t.index == 0)
Console.WriteLine($"[1] - FIRST: {t.num}");
else
Console.WriteLine($"[1] - UPDATE: {t.num}");
});
subject.OnNext(25);
observable
.Select(o => o.Select((t, index) => (t.Key, t.num, index)))
.Merge()
.Subscribe(t =>
{
if (t.index == 0)
Console.WriteLine($"[1] - FIRST: {t.num}");
else
Console.WriteLine($"[1] - UPDATE: {t.num}");
});