我正在尝试生成一系列事件,这些事件由目录中现有文件的文件名组成。
这很有效。代码是这样的:
Observable.Generate(
ExistingFiles(path, filter),
Condition(),
Iterate(),
CreateFileChangedEvent()
)
private static Func<List<string>.Enumerator, FileChangedEvent> CreateFileChangedEvent()
{
return enumerator => new FileChangedEvent(enumerator.Current, @"Existing");
}
private static Func<List<string>.Enumerator, List<string>.Enumerator> Iterate()
{
return enumerator =>
{
enumerator.MoveNext();
return enumerator;
};
}
private static List<string>.Enumerator ExistingFiles(string path, string filter)
{
List<string>.Enumerator files =
Directory.GetFiles(path, filter)
.ToList()
.GetEnumerator();
// Advance the enumerator to the first result
files.MoveNext();
return files;
}
private static Func<List<string>.Enumerator, bool> Condition()
{
return enumerator => enumerator.Current != null;
}
}
我想将此可观察量包装在另一个可观察量中,该可观察量将按时间触发文件枚举。我对重复/重播的所有尝试都不会触发文件搜索并生成以前的事件,这不是我想要的。
提前谢谢。
编辑 1 :好吧,代码可以使用ToObservable((简化
Directory.GetFiles(path, filter)
.ToObservable()
.Select(s => new FileChangedEvent(s, @"Existing"))
如果你想让你的文件搜索每隔一段时间,你可以做如下的事情:
Observable.Interval(TimeSpan.FromSeconds(2))
.Select(_ =>
Directory.GetFiles(path, filter)
.ToObservable()
.Select(s => new FileChangedEvent(s, @"Existing"))
)
.SelectMany(_ => _)
.Subscribe(onNewFileChangedEvent => {
Console.WriteLine(onNewFileChangedEvent.ToString());
});
但这会产生相同的"现有"文件更改事件,您需要在将它们发送到订阅之前删除重复数据(我猜?您可以使用 Buffer 运算符来创建两组文件的差异并仅发出这些文件。
为什么选择许多((
原始流从Interval
返回计时器事件。在每个事件中,我们都返回一个新的FileChangedEvents
流。
t1--t2--t3
---t3{evt1,evt2,evt3}
---t{evt1,evt2,evt3}
---t1{evt1,evt2,evt3}
因为我们想将它们融化回一个订阅者可以使用的流中,所以我们必须合并这些结果。输入将投影和拼合为一个步骤的SelectMany()
。因为我们不是 我们对selectMany
的projection
属性不感兴趣,所以我们表明我们不会对指示的项目(按约定(使用_ => _
下划线语法执行任何操作。
我不能只使用 Merge(( 运算符吗?
Merge()
本质上是标识SelectMany(_ => _)
的语义等效项(有些重载会执行一些有趣的事情,例如控制并发性(。连续生产要合并的序列当然没有问题。这是原始实现