我正在开发一个解决方案,例如用户以高速率发生变化。我需要记录每个更改,延迟通知然后返回不同的更改,例如已更改的用户的用户ID。我使用RX提出了以下代码:
private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
.Buffer(EVENT_BUFFER_DELAY)
.Where(buffer => buffer.Count > 0)
.Subscribe(OnEventBufferProcess);
这似乎可以正常工作,我得到了
添加的所有值userEventSubject.OnNext(userId);
我的问题是:我可以区分更改,例如,当具有相同用户ID值的多个ONNEXT时,我不希望生成的缓冲区包含重复。当然,我可以在订阅处理程序中不同的值,但是我想知道这是否可以在RX级别上完成?我尝试了独特的方法,但仍然可以获得所有值。
由于我想要的只是跟踪延迟期间所做的更改,返回订阅处理程序并重新开始,我需要清除usereventsubject吗?
如果使用Observable.Distinct
方法,您将检查不同的IList<int>
S,这确实不是您想要的行为。
看来您需要将不同的区别应用于缓冲区内的元素。
您可以使用Observable.Select
方法将其应用于每个缓冲区:
private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
.Buffer(EVENT_BUFFER_DELAY)
.Where(buffer => buffer.Count > 0)
.Select(buffer => buffer.Distinct()) // distinct elements in each buffer
.Subscribe(OnEventBufferProcess);
请注意,每当您使用Distinct()
方法时,您都需要对检查平等的方式保持警惕,如果您使用参考类型 - 您可能需要实现某种类型的平等比较。在这种情况下,这不是问题,因为您正在使用int
s。
您可以使用独特的未加工扩展方法:
private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
.DistinctUntilChanged()
.Buffer(EVENT_BUFFER_DELAY)
.Where(buffer => buffer.Count > 0)
.Subscribe(OnEventBufferProcess);