通过属性和超时分组



我需要将对象流转换为一批对象流,并使用 vactive Extensions 通过属性值分组它们:

class Record
{
    public string Group;
    public int Value;
}
IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
    // ...
}

批次完成并发送到输出流时,这些都发生了:

  • 一个新对象来自源流,其Group值与以前的值不同
  • N秒的源流中没有新对象

例如,如果a1表示new Record { Group = "a", Value = 1}

input:   -a1-a2-a3-b1-b2-
output:  -[a1, a2, a3]-[b1, b2]-
input:   -a1-a2----------a3-
output:  -[a1, a2]-------[a3]-

尝试了GroupByUntilDebounceBufferTimer的各种组合。如何完成?

诀窍是在本身上使用GroupByUntilThrottle使用:

IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
    return source.GroupByUntil(x => x.Group, g => g.Throttle(timeout))
                 .SelectMany(x => x.ToList());
}

相关内容

  • 没有找到相关文章

最新更新