触发第一个事件后,RX缓冲事件数秒



我有一个文件监视器,从中我观察创建和更改的事件。我希望当第一个事件被触发(创建或更改)时,它需要开始缓冲10秒,在这10秒之后,我想处理缓冲的事件。

我已经得到的是:

Observable.FromEventPattern<FileSystemEventArgs>(FileSystemWatcher, "Created")
                .Merge(Observable.FromEventPattern<FileSystemEventArgs>(FileSystemWatcher, "Changed"))
                .Buffer(TimeSpan.FromSeconds(10))
                .Subscribe(list =>
                {
                   Debug.WriteLine("Do something");
                });

此代码不调试。WriteLine("Do something");'每隔10秒。

编辑:好吧,让我试着用时间线来解释。

  1. 文件监视器空闲,没有触发事件。
  2. 在一段未知的时间后,文件被放置在目录
  3. 中。
  4. 创建的事件被触发
  5. 可观察列表开始缓冲(所有事件)10秒
  6. 在这10秒之后,订阅动作被执行,它将一次处理所有事件

希望这能说清楚

我假设您想要以下行为:

  1. 初始事件后,缓冲所有事件10秒。
  2. 一旦10秒窗口关闭,下一个应该触发一个新的10秒缓冲区,用于10秒后的所有事件。

假设5个事件均匀分布在5秒内,间隔13秒,然后另外5个事件均匀分布在5秒内。大理石图看起来像这样:

timeline: 0--1--2--3--4--5--6--7--8--9-10-11-12-13-14-15-16-17-18-19-20-21-22-23-24-25-26-27
events  : x--x--x--x--x-------------------------------------x--x--x--x--x------------------
stdbuff : |----------------------------|-----------------------------|---------------------
desired : BeginCapture-----------------Return---------------BeginCapture------------------Return

使用直接的Buffer的问题是,它看起来像上面标记的stdbuff,并将第二组事件分成两组,导致第二组事件的两个列表:一个有三个事件,一个有两个事件。您需要一个列表(用于第二组),使用类似desired流的逻辑。从0开始捕获,从10返回列表。从17开始捕获,在27返回列表。

如果我误解了你(再次),请张贴一个大理石图表,类似于上面,代表你想要的东西如何工作。


假设我理解正确,下面的代码将工作…

//var initialSource = Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Created))
//  .Merge(Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Changed)));
    //Comment this out, and use the above lines for your code. This just makes testing the Rx components much easier.
var initialSource = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5)
    .Concat(Observable.Empty<long>().Delay(TimeSpan.FromSeconds(13)))
    .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5));
initialSource
    .Publish( _source => _source 
        .Buffer(_source
            .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
            .DistinctUntilChanged()
            .Delay(TimeSpan.FromSeconds(10))
        )
    )
    .Subscribe(list =>
    {
        Debug.WriteLine($"Time-stamp: {DateTime.Now.ToLongTimeString()}");
        Debug.WriteLine($"List Count: {list.Count}");
    });

:

首先,我们需要识别"主要事件",即在上面的desired流描述中代表BeginCapture注释的事件。可以这样找到:

 var primaryEvents = initialSource
        .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
        .DistinctUntilChanged();

一旦我们有了BeginCapture事件,它可以表示窗口打开,就很容易找到Return事件,或者窗口关闭:

 var closeEvents = primaryEvents.Delay(TimeSpan.FromSeconds(10));

实际上,由于我们不关心关闭和打开之间发生什么,因此我们实际上只需要担心关闭事件,因此我们可以将其缩小为:

 var closeEvents = initialSource
        .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
        .DistinctUntilChanged()
        .Delay(TimeSpan.FromSeconds(10));

将其插入Buffer, closeEventsbufferBoundaries:

var bufferredLists = initialSource
    .Buffer(initialsource
        .Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
        .DistinctUntilChanged()
        .Delay(TimeSpan.FromSeconds(10))
    );

最后,由于我们对initialSource有多个订阅,因此我们希望使用Publish来确保并发工作正常,从而导致最终的答案

相关内容

  • 没有找到相关文章

最新更新