给定一个类:
class Foo { DateTime Timestamp {get; set;} }
。和一个IObservable<Foo>
,保证单调增加Timestamp
秒,如何根据这些Timestamp
生成分块到列表中的IObservable<IList<Foo>>
?
即每个IList<Foo>
应该有五秒钟的事件,或者其他什么。我知道我可以在TimeSpan
过载的情况下使用Buffer
,但我需要从事件本身而不是挂钟中获取时间。(除非有一种聪明的方法在这里提供一个使用IObservable
本身作为.Now
来源的IScheduler
?
如果我尝试像这样使用Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)
重载:
IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();
pub.Buffer(windows).Subscribe(x => t.Dump())); // linqpad
pub.Connect();
。然后IList
实例包含导致窗口关闭的项目,但我真的希望这个项目进入下一个窗口/缓冲区。
例如,使用时间戳[0, 1, 10, 11, 15]
您将获得[[0], [1, 10], [11, 15]]
块而不是[[0, 1], [10, 11], [15]]
这里有一个想法。组键条件是"窗口编号",我使用GroupByUntil
。这在您的示例中为您提供了所需的输出(我像该示例一样使用了 int 流 - 但您可以替换任何需要对窗口进行编号的内容(。
public class Tests : ReactiveTest
{
public void Test()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateHotObservable<int>(
OnNext(0, 0),
OnNext(1, 1),
OnNext(10, 10),
OnNext(11, 11),
OnNext(15, 15),
OnCompleted(16, 0));
xs.Publish(ps => // (1)
ps.GroupByUntil(
p => p / 5, // (2)
grp => ps.Where(p => p / 5 != grp.Key)) // (3)
.SelectMany(x => x.ToList())) // (4)
.Subscribe(Console.WriteLine);
scheduler.Start();
}
}
笔记
- 我们发布源流是因为我们会订阅不止一次。
- 这是一个创建组键的功能 - 使用它从您的项目类型生成窗口编号。
- 这是组终止条件 - 使用它来检查另一个窗口中项目的源流。请注意,这意味着窗口不会关闭,直到它外部的元素到达,或者源流终止。如果您考虑一下,这是显而易见的 - 您想要的输出需要考虑窗口结束后的下一个元素。请注意,如果您的源与实时有任何关系,则可以将其与输出术语的 null/默认实例的
Observable.Timer+Select
合并,以更早地终止流。 - 选择多个将组放入列表中并平展流。
此示例将在 LINQPad 中很好地运行,如果您包含 nuget 包 rx 测试。新建一个测试实例,然后运行Test()
方法。
我认为 James World 的答案更整洁/更具可读性,但对于后代,我找到了另一种使用 Buffer()
的方法:
IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks))
.DistinctUntilChanged().Publish.RefCount();
pub.Buffer(windows, x => windows).Subscribe(x => t.Dump()));
pub.Connect();
对于 10 米赛事,詹姆斯的方法速度是 2.5 倍以上(在我的机器上为 20 秒,而我的机器上为 56 秒(。
>Window
是Buffer
的推广,GroupJoin
是Window
(和Join
(的推广。 当您编写Window
或Buffer
查询时,您发现通知被错误地包含在窗口/列表的边缘或从边缘排除,然后根据GroupJoin
重新定义查询,以控制边缘通知到达的位置。
请注意,为了使关闭通知可用于新打开的窗口,您必须将边界定义为这些通知的窗口(窗口数据,而不是边界数据(。 在这种情况下,不能使用一系列 DateTime 值作为边界,必须改用一系列 Foo 对象。 为此,我已将您的Select
-> DistinctUntilChanged
查询替换为Scan
-> Where
-> Select
查询。
var batches = foos.Publish(publishedFoos => publishedFoos
.Scan(
new { foo = (Foo)null, last = DateTime.MinValue, take = true },
(acc, foo) =>
{
var boundary = foo.Timestamp - acc.last >= TimeSpan.FromSeconds(5);
return new
{
foo,
last = boundary ? foo.Timestamp : acc.last,
take = boundary
};
})
.Where(a => a.take)
.Select(a => a.foo)
.Publish(boundaries => boundaries
.Skip(1)
.StartWith((Foo)null)
.GroupJoin(
publishedFoos,
foo => foo == null ? boundaries.Skip(1) : boundaries,
_ => Observable.Empty<Unit>(),
(foo, window) => (foo == null ? window : window.StartWith(foo)).ToList())))
.Merge()
.Replay(lists => lists.SkipLast(1)
.Select(list => list.Take(list.Count - 1))
.Concat(lists),
bufferSize: 1);
仅当您期望序列最终结束并且您关心不删除最后一个通知时,才需要末尾的Replay
查询;否则,您可以简单地修改window.StartWith(foo)
以window.StartWith(foo).SkipLast(1)
以获得相同的基本结果,尽管最后一个缓冲区的最后一个通知将丢失。