可观察的缓冲液



我正在开发一个解决方案,例如用户以高速率发生变化。我需要记录每个更改,延迟通知然后返回不同的更改,例如已更改的用户的用户ID。我使用RX提出了以下代码:

private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
    .Buffer(EVENT_BUFFER_DELAY)
    .Where(buffer => buffer.Count > 0)                
    .Subscribe(OnEventBufferProcess);

这似乎可以正常工作,我得到了

添加的所有值
userEventSubject.OnNext(userId);

我的问题是:我可以区分更改,例如,当具有相同用户ID值的多个ONNEXT时,我不希望生成的缓冲区包含重复。当然,我可以在订阅处理程序中不同的值,但是我想知道这是否可以在RX级别上完成?我尝试了独特的方法,但仍然可以获得所有值。

由于我想要的只是跟踪延迟期间所做的更改,返回订阅处理程序并重新开始,我需要清除usereventsubject吗?

如果使用Observable.Distinct方法,您将检查不同的IList<int> S,这确实不是您想要的行为。

看来您需要将不同的区别应用于缓冲区内的元素。

您可以使用Observable.Select方法将其应用于每个缓冲区:

private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
    .Buffer(EVENT_BUFFER_DELAY)
    .Where(buffer => buffer.Count > 0)
    .Select(buffer => buffer.Distinct()) // distinct elements in each buffer
    .Subscribe(OnEventBufferProcess);

请注意,每当您使用Distinct()方法时,您都需要对检查平等的方式保持警惕,如果您使用参考类型 - 您可能需要实现某种类型的平等比较。在这种情况下,这不是问题,因为您正在使用ints。

您可以使用独特的未加工扩展方法:

private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
    .DistinctUntilChanged()
    .Buffer(EVENT_BUFFER_DELAY)    
    .Where(buffer => buffer.Count > 0)                
    .Subscribe(OnEventBufferProcess);

相关内容

  • 没有找到相关文章

最新更新