groupby具有最新元素和更新,而不是密钥和更新



我得到了一个可观察的,我想将它们分组。以前,但是我想要最新的值,而不是获取钥匙。我也想要更新,跳过最新值。

在此示例中,为了使其更简单,我们只能与Modulo结果相等。但是我希望我的解决方案适用于所有内容。

示例:

-------- 15 ----- 25 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

让我们在上一个示例上添加一些时间点,我们将订阅:

--- S1 ------ 15--S2 --- 25 -----------------...

预期的结果是:

  • s1接收最新:15,更新:可观察的以25开始,更新%10 == 5,以后到达。说明:15到达后将通知S1,15是最新的元素,因此我想立即将其避开。第二个论点将是可观察的,将来会产生以后的25,而将来的%10 == 5个元素。

  • s2接收最新:15,更新:可观察的以25开始,更新%10 == 5,将稍后到达。说明:S2将在订阅中通知,15是最新的元素,所以我想立即将其删除。第二个论点将是可观察的,将来会产生以后的25,而将来的%10 == 5个元素。

  • S3接收最新:25,更新:可观察到的更新%10 == 5,以后到达。说明:S3将在订阅中通知,25是最新元素,所以我想立即将其放置。第二个论点将是可观察的,将来会产生%10 == 5个元素。

这是一些分辨率的尝试:

下面的代码使用元组和Nunit。

首次尝试

[Test]
public void WhenWeGroupByReplaying1()
{
    var subject = new Subject<uint>();
    var observable = subject.GroupBy(t => t%10)
        .Select(t =>
        {
            var connectableObservable = t.Replay(1);
            connectableObservable.Connect();
            return (key: t.Key, updates: connectableObservable);
        }).Replay();
    observable.Connect();
    // I will block on the First of the lambda below
    var getLastAndUpdates = observable
        .Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));
    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[1] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
    });
    subject.OnNext(15);
    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[2] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
    });
    subject.OnNext(25);
    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[3] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
    });
}

该解决方案将被评论中所示。

第二次尝试

[Test]
public void WhenWeGroupByReplaying2()
{
    var subject = new Subject<uint>();
    var observable = subject.GroupBy(t => t, t => t, new ModuloEqualityComparer())
        .Select(t =>
        {
            var connectableObservable = t.Publish(t.Key);
            connectableObservable.Connect();
            return (key: t.Key, updates: connectableObservable);
        }).Replay();
    observable.Connect();
    var getLastAndUpdates = observable
        .Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));
    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[1] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
    });
    subject.OnNext(15);
    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[2] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
    });
    subject.OnNext(25);
    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[3] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
    });
}
private class ModuloEqualityComparer : IEqualityComparer<uint>
{
    public bool Equals(uint x, uint y)
    {
        return x % 10 == y % 10;
    }
    public int GetHashCode(uint obj)
    {
        return (obj % 10).GetHashCode();
    }
}

结果:

[1] - FIRST: 15
[1] - UPDATE: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25

预期结果:(确切的订单不含用(

[1] - FIRST: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25

第三尝试

[Test]
public void WhenWeGroupByReplaying3()
{
    var subject = new Subject<uint>();
    var observable = subject.GroupBy(t => (key: t%10, value:t), t => t, new ModuloEqualityComparer2())
        .Select(t =>
        {
            var connectableObservable = t.Publish(t.Key.Item2);
            connectableObservable.Connect();
            return (key: t.Key, updates: connectableObservable);
        }).Replay();
    observable.Connect();
    var getLastAndUpdates = observable
        .Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));
    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[1] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
    });
    subject.OnNext(15);
    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[2] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
    });
    subject.OnNext(25);
    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[3] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
    });
}
private class ModuloEqualityComparer2 : IEqualityComparer<(uint,uint)>
{
    private readonly ModuloEqualityComparer _moduloEqualityComparer = new ModuloEqualityComparer();
    public bool Equals((uint, uint) x, (uint, uint) y)
    {
        return _moduloEqualityComparer.Equals(x.Item1, y.Item1);
    }

    public int GetHashCode((uint, uint) obj)
    {
        return _moduloEqualityComparer.GetHashCode(obj.Item1);
    }
}

结果:

[1] - FIRST: 15
[1] - UPDATE: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25

预期结果:(确切的订单不含用(

[1] - FIRST: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25

感谢您的阅读。

我不完全确定您要实现的目标,但希望这会帮助您:

您的代码有几个问题:

  1. .First()是有原因的。您不应该使用RX
  2. 使用阻止代码
  3. .Replay()需要虚拟订阅才能正常工作。我不确定这是否是困扰您的代码,但要实现您的目标,您想要那个。
  4. 嵌套订阅通常是一个坏主意。我已经用.Merge()替换了嵌套的订阅。

如果这不能解决您的问题,我建议修改您的问题,以描述您尝试使用RX完成的工作。这有点像XY的情况。

这是代码:

var subject = new Subject<uint>();
var observable = subject.GroupBy(t => t % 10)
    .Select(t => t.Replay(1).RefCount()).Replay().RefCount();
// dummy subscriptions required for Replay to work correctly.
var dummySub = observable.Merge().Subscribe();
observable
    .Select(o => o.Select((t, index) => (t.Key, t.num, index)))
    .Merge()
    .Subscribe(t =>
    {
        if (t.index == 0)
            Console.WriteLine($"[1] - FIRST: {t.num}");
        else
            Console.WriteLine($"[1] - UPDATE: {t.num}");
    });
subject.OnNext(15);
observable
    .Select(o => o.Select((t, index) => (t.Key, t.num, index)))
    .Merge()
    .Subscribe(t =>
    {
        if (t.index == 0)
            Console.WriteLine($"[1] - FIRST: {t.num}");
        else
            Console.WriteLine($"[1] - UPDATE: {t.num}");
    });
subject.OnNext(25);
observable
    .Select(o => o.Select((t, index) => (t.Key, t.num, index)))
    .Merge()
    .Subscribe(t =>
    {
        if (t.index == 0)
            Console.WriteLine($"[1] - FIRST: {t.num}");
        else
            Console.WriteLine($"[1] - UPDATE: {t.num}");
    });

相关内容

  • 没有找到相关文章

最新更新