如何在支持取消的情况下,为使用反应式扩展 (Rx) 的慢速使用者对 IObservable 主题进行排队



首先是一些背景,我编写了一个名为Duplicitiy(github.com)的开源.NET库,它使用FileSystemWatcher复制两个目录之间的所有文件更改。

我编写了一个实现IObservable<FileSystemChange>FileSystemObservable类(它使用 FSWatcher 包装实际FileSystemWatcher)。创建、修改或删除文件或目录时,将使用反应式扩展通过Subject<FileSystemChange>发布更改。

然后,我使用以下订阅订阅此可观察量。

 return observable
          .Buffer(() => observable.Throttle(TimeSpan.FromSeconds(2)).Timeout(TimeSpan.FromMinutes(1)))     
          .PrioritizeFileSystemChanges()           
          .SelectMany(x => x);
更改

将缓冲,直到至少有 2 秒的时间段,最多 1 分钟没有任何更改。这是因为在删除目录时,FileSystemWatcher会通知所有包含的文件和目录。我们可以通过吞下目录中包含的更改来优化行为,只需删除订阅者中的父级。这由PrioritizeFileSystemChanges筛选器处理。它还允许我们忽略在缓冲区窗口中创建和随后删除的文件,从而再次减少目标上的 IO 操作。

这是有效的,尽管目前以一种幼稚的方式,不支持失败/重试。

但是,我的问题是,此可观察量的订阅者可能需要合理的时间来处理每个更改。例如,将大文件复制到慢速文件系统。当当前正在复制的同一文件发生新的文件系统更改时,我该如何处理中止正在进行的操作。或者,如果文件包含在缓冲列表中但未完成,如何删除或排除它?

我假设需要对原始可观察量进行另一个订阅,但不确定如何最好地共享状态或修改待处理任务?必须按照接收更改的顺序处理更改,该顺序指示队列。但是,新的文件系统更改可能适用于需要取消或删除的排队操作。队列不是为无序删除而设计的。

例如,如果我们当前正在复制文件FooBar.txt并且删除了Foo目录。然后,必须取消目录和所有子目录的任何正在进行或挂起的更改。这可能是任务并行库的一个用例,还是我可以采取一些反应式方法?

任何 github 拉取请求也会被善意地收到!

你在这里似乎有几个目标/问题:

  1. 删除由于以后的更改而不再需要的早期更改。 链表可能非常适合此。 它为常规队列使用和良好的项目删除性能提供了良好的性能。
  2. 取消由于以后的更改而不再需要的正在进行的操作。这还将包括需要重新启动的操作。 这将要求您找到一个允许您取消正在进行的操作的库。 System.IO 类不提供此类取消,因此您需要找到一个库或自己创建库。
  3. 这可能是任务并行库的一个用例,还是我可以采取一些反应式方法? 你的措辞让我印象深刻,好像这里有一个或另一个选择,但没有理由你不能将两者混合在一起。 文件更改的可观察量是一个很好的起点 (RX)。 正在进行的操作可能会作为获取CancellationToken并返回Task (TPL) 的方法实现。

这里缺少的步骤似乎是如何将更改的"队列"转到实际工作。 基本上,订阅必须(快速)排队更改并启动一个(慢速,异步)方法,如果它尚未运行,则"递归"处理队列;像这样:

'changes is your returned observable
'toProcess is the "queue" of changes
'processor holds information about and the task of the in-progress operation
changes.Subscribe(Sub(c)
                     UpdateQueueWithChange(c, toProcess, processor)
                     If processor.Task.IsCompleted Then
                         ProcessNextChange(processor, toProcess)
                     End If
                  End Sub)

ProcessNextChange 是一种方法,它将获取队列中的下一个更改,启动操作,设置操作任务的回调以重新调用 ProcessNextChange。 如果没有留下任何更改,则应为processor提供一个已完成的任务,该任务不会重新调用 ProcessNextChange。

UpdateQueueWithChange需要更新"队列"并在必要时取消正在进行的操作,这应该会触发对ProcessNextChange的调用,因为任务完成将启动下一个操作。

如果要在取消对可观察更改的订阅时取消操作,我建议将一次性订阅与存储CancellationDispoable(由ProcessNextChange更新并额外存储在processor中)的SerialDisposable一起放入CompositeDisposable中,这是操作方法所需的CancellationToken的来源。ProcessNextChange 将在启动操作之前检查 SerialDisposable 以查看它是否已释放。 复合材料一次性将是您存储在某个地方以结束整个事情的东西。

CompositeDisposable 'this is what your application keeps around
|- IDisposable from subscription to changes observable
|- SerialDisposable
   |- .Disposable property = CancellationDisposable 
      'changed each time ProcessNextChange is called

相关内容

最新更新