可观察以定期处理目录中的文件



我正在尝试生成一系列事件,这些事件由目录中现有文件的文件名组成。

这很有效。代码是这样的:

Observable.Generate(
                        ExistingFiles(path, filter),
                        Condition(),
                        Iterate(),
                        CreateFileChangedEvent()
                        )
           private static Func<List<string>.Enumerator, FileChangedEvent> CreateFileChangedEvent()
            {
                return enumerator => new FileChangedEvent(enumerator.Current, @"Existing");
            }
            private static Func<List<string>.Enumerator, List<string>.Enumerator> Iterate()
            {
                return enumerator =>
                {
                    enumerator.MoveNext();
                    return enumerator;
                };
            }
            private static List<string>.Enumerator ExistingFiles(string path, string filter)
            {
                List<string>.Enumerator files =
                    Directory.GetFiles(path, filter)
                    .ToList()
                    .GetEnumerator();
                // Advance the enumerator to the first result
                files.MoveNext();
                return files;
            }
            private static Func<List<string>.Enumerator, bool> Condition()
            {
                return enumerator => enumerator.Current != null;
            }
        }
我想将此可观察量

包装在另一个可观察量中,该可观察量将按时间触发文件枚举。我对重复/重播的所有尝试都不会触发文件搜索并生成以前的事件,这不是我想要的。

提前谢谢。

编辑 1 :好吧,代码可以使用ToObservable((简化

Directory.GetFiles(path, filter)
    .ToObservable()
    .Select(s => new FileChangedEvent(s, @"Existing"))

如果你想让你的文件搜索每隔一段时间,你可以做如下的事情:

  Observable.Interval(TimeSpan.FromSeconds(2))
    .Select(_ => 
      Directory.GetFiles(path, filter)
     .ToObservable()
     .Select(s => new FileChangedEvent(s, @"Existing"))
   )
   .SelectMany(_ => _)
   .Subscribe(onNewFileChangedEvent => {
     Console.WriteLine(onNewFileChangedEvent.ToString());
   });

但这会产生相同的"现有"文件更改事件,您需要在将它们发送到订阅之前删除重复数据(我猜?您可以使用 Buffer 运算符来创建两组文件的差异并仅发出这些文件。

为什么选择许多((

原始流从Interval返回计时器事件。在每个事件中,我们都返回一个新的FileChangedEvents流。

   t1--t2--t3
          
            ---t3{evt1,evt2,evt3}
           ---t{evt1,evt2,evt3}
      ---t1{evt1,evt2,evt3}

因为我们想将它们融化回一个订阅者可以使用的流中,所以我们必须合并这些结果。输入将投影和拼合为一个步骤的SelectMany()。因为我们不是 我们对selectManyprojection属性不感兴趣,所以我们表明我们不会对指示的项目(按约定(使用_ => _下划线语法执行任何操作。

我不能只使用 Merge(( 运算符吗?

Merge()本质上是标识SelectMany(_ => _)的语义等效项(有些重载会执行一些有趣的事情,例如控制并发性(。连续生产要合并的序列当然没有问题。这是原始实现

相关内容

  • 没有找到相关文章

最新更新