如何在RXSWIFT中实现串行网络调用的队列



我正在使用一个应用程序,我想使用rxswift和rxcocoa

实现以下内容
  1. 下载包含URL到x文件数的JSON
  2. 下载文件1,进程文件1
  3. 下载文件2,进程文件2
  4. 下载文件3,进程文件3

...等等

关键是每个文件的处理必须在下载下一个文件之前完成。至少必须按顺序执行文件处理顺序。如果我可以在文件1处理时开始下载文件2,那将是很棒的,但不是必需的。

我已经尝试使用SerialDispatchqueuesCheduler来制作这项工作,但是由于文件的大小不同,因此每个文件的下载在不同时间的下载,因此处理代码以与开始下载的顺序不同。

我可以通过使用nsoperations等轻松地实现它,但我想在此应用中继续使用Rx,因为它是我在此应用中其他地方使用的。

下面我包括了一个带有一些代码的摘要。为了这个问题而添加了评论。

       .flatMap { [unowned self] (tasks: [DiffTask]) -> Observable<ApplyDiffStatus> in
            return Observable.from(tasks)
                .observeOn(self.backgroundScheduler) // StackOverflow: backgroundScheduler is a SerialDispatchQueueScheduler
                .flatMapWithIndex({ [unowned self] (task, index) in
                    return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // StackOverflow: Downloads a file from a URL
                })
                .catchError({ (error) -> Observable<DictionaryUpdater.DiffTaskProgress> in
                    observable.onError(error)
                    throw error
                })
                .map({ (diffTask : DiffTaskProgress) -> DiffTaskProgress.Progress in
                    // Stack Overflow: I've wrapped much of the progress observable in a Observable<UpdateProgress>
                    switch diffTask.progress {
                    case .started(currentTask: let currentTask, taskCount: let taskCount):
                        observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
                    case .finished(data: _, currentTask: let currentTask, taskCount: let taskCount):
                        observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
                    case .progress(completion: _, currentTask: let currentTask, taskCount: let taskCount):
                        observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
                    }
                    return diffTask.progress
                })
                .flatMap({ [unowned self] (progress: DiffTaskProgress.Progress) -> Observable<ApplyDiffStatus> in
                    switch progress {
                    case .finished(data: let data, currentTask: let currentTask, taskCount: let taskCount):
                        return self.applyDiff(data, currentTask: currentTask, taskCount: taskCount) // StackOverflow: PROCESSES THE FILE THAT WAS DOWNLOADED
                    default:
                        return Observable.empty()
                    }
                })
        }

我通过使用concatMap操作员设法解决了它。所以而不是

.flatMapWithIndex({ [unowned self] (task, index) in
    return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // StackOverflow: Downloads a file from a URL
})

我做了这样的事情:

tasks.enumerated().concatMap { (index, task) in
    return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count)
}

concatMap操作员确保在发出任何其他信号之前完成了第一个可观察的可观察器。我必须使用enumerated(),因为ConcatMap不带concatMapWithIndex,但它起作用:)

最新更新