如何根据 Func 将 IObservable 窗口/缓冲<T>为块<T>



给定一个类:

class Foo { DateTime Timestamp {get; set;} }

。和一个IObservable<Foo>,保证单调增加Timestamp秒,如何根据这些Timestamp生成分块到列表中的IObservable<IList<Foo>>

即每个IList<Foo>应该有五秒钟的事件,或者其他什么。我知道我可以在TimeSpan过载的情况下使用Buffer,但我需要从事件本身而不是挂钟中获取时间。(除非有一种聪明的方法在这里提供一个使用IObservable本身作为.Now来源的IScheduler

如果我尝试像这样使用Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)重载:

IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
    x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();
pub.Buffer(windows).Subscribe(x => t.Dump()));  // linqpad
pub.Connect();

。然后IList实例包含导致窗口关闭的项目,但我真的希望这个项目进入下一个窗口/缓冲区。

例如,使用时间戳[0, 1, 10, 11, 15]您将获得[[0], [1, 10], [11, 15]]块而不是[[0, 1], [10, 11], [15]]

这里有一个想法。组键条件是"窗口编号",我使用GroupByUntil。这在您的示例中为您提供了所需的输出(我像该示例一样使用了 int 流 - 但您可以替换任何需要对窗口进行编号的内容(。

public class Tests : ReactiveTest
{
    public void Test()
    {
        var scheduler = new TestScheduler();
        var xs = scheduler.CreateHotObservable<int>(
            OnNext(0, 0),
            OnNext(1, 1),
            OnNext(10, 10),
            OnNext(11, 11),
            OnNext(15, 15),
            OnCompleted(16, 0));                  
            
        xs.Publish(ps =>                                // (1)
            ps.GroupByUntil(
                p => p / 5,                             // (2)
                grp => ps.Where(p => p / 5 != grp.Key)) // (3)
            .SelectMany(x => x.ToList()))               // (4)
        .Subscribe(Console.WriteLine);
        
        scheduler.Start();
    }
}

笔记

  1. 我们发布源流是因为我们会订阅不止一次。
  2. 这是一个创建组键的功能 - 使用它从您的项目类型生成窗口编号。
  3. 这是组终止条件 - 使用它来检查另一个窗口中项目的源流。请注意,这意味着窗口不会关闭,直到它外部的元素到达,或者源流终止。如果您考虑一下,这是显而易见的 - 您想要的输出需要考虑窗口结束后的下一个元素。请注意,如果您的源与实时有任何关系,则可以将其与输出术语的 null/默认实例的 Observable.Timer+Select 合并,以更早地终止流。
  4. 选择多个将组放入列表中并平展流。

此示例将在 LINQPad 中很好地运行,如果您包含 nuget 包 rx 测试。新建一个测试实例,然后运行Test()方法。

我认为 James World 的答案更整洁/更具可读性,但对于后代,我找到了另一种使用 Buffer() 的方法:

IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
        x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks))
    .DistinctUntilChanged().Publish.RefCount();
pub.Buffer(windows, x => windows).Subscribe(x => t.Dump()));
pub.Connect();

对于 10 米赛事,詹姆斯的方法速度是 2.5 倍以上(在我的机器上为 20 秒,而我的机器上为 56 秒(。

>WindowBuffer的推广,GroupJoinWindow(和Join(的推广。 当您编写WindowBuffer查询时,您发现通知被错误地包含在窗口/列表的边缘或从边缘排除,然后根据GroupJoin重新定义查询,以控制边缘通知到达的位置。

请注意,为了使关闭通知可用于新打开的窗口,您必须将边界定义为这些通知的窗口(窗口数据,而不是边界数据(。 在这种情况下,不能使用一系列 DateTime 值作为边界,必须改用一系列 Foo 对象。 为此,我已将您的Select -> DistinctUntilChanged 查询替换为Scan -> Where -> Select 查询。

var batches = foos.Publish(publishedFoos => publishedFoos
  .Scan(
    new { foo = (Foo)null, last = DateTime.MinValue, take = true },
    (acc, foo) =>
    {
      var boundary = foo.Timestamp - acc.last >= TimeSpan.FromSeconds(5);
      return new
      {
        foo,
        last = boundary ? foo.Timestamp : acc.last,
        take = boundary
      };
    })
  .Where(a => a.take)
  .Select(a => a.foo)
  .Publish(boundaries => boundaries
    .Skip(1)
    .StartWith((Foo)null)
    .GroupJoin(
      publishedFoos,
      foo => foo == null ? boundaries.Skip(1) : boundaries,
      _ => Observable.Empty<Unit>(),
      (foo, window) => (foo == null ? window : window.StartWith(foo)).ToList())))
  .Merge()
  .Replay(lists => lists.SkipLast(1)
                        .Select(list => list.Take(list.Count - 1))
                        .Concat(lists),
          bufferSize: 1);

仅当您期望序列最终结束并且您关心不删除最后一个通知时,才需要末尾的Replay查询;否则,您可以简单地修改window.StartWith(foo)window.StartWith(foo).SkipLast(1)以获得相同的基本结果,尽管最后一个缓冲区的最后一个通知将丢失。

相关内容

  • 没有找到相关文章

最新更新