如何使用swift组合异步处理一组任务



我有一个发布者,它接受网络调用并返回一个ID数组。我现在需要为每个ID调用另一个网络调用来获取我的所有数据。我希望最终的发布者拥有结果对象。

第一个网络结果:

"user": {
"id": 0,
"items": [1, 2, 3, 4, 5]
}

最终对象:

struct User {
let id: Int
let items: [Item]
... other fields ...
}
struct Item {
let id: Int
... other fields ...
}

处理多个网络呼叫:

userPublisher.flatMap { user in
let itemIDs = user.items
return Future<[Item], Never>() { fulfill in
... OperationQueue of network requests ...
}
}

我想并行执行网络请求,因为它们不相互依赖。我不确定Future是否就在这里,但我想我会有代码来做DispatchGroup或OperationQueue,并在它们全部完成时完成。还有更多的联合方式吗?

Combine是否有将一个流拆分为多个并行流并将这些流连接在一起的概念?

Combine提供了围绕URLSession的扩展来处理网络请求,除非您真的需要与基于OperationQueue的网络集成,否则Future是一个不错的选择。您可以运行多个Future并在某个时刻收集它们,但我确实建议您查看Combine的URLSession扩展。

struct User: Codable {
var username: String
}
let requestURL = URL(string: "https://example.com/")!
let publisher = URLSession.shared.dataTaskPublisher(for: requestURL)
.map { $0.data }
.decode(type: User.self, decoder: JSONDecoder())

关于运行一批请求,可以使用Publishers.MergeMany,即:

struct User: Codable {
var username: String
}
let userIds = [1, 2, 3]
let subscriber = Just(userIds)
.setFailureType(to: Error.self)
.flatMap { (values) -> Publishers.MergeMany<AnyPublisher<User, Error>> in
let tasks = values.map { (userId) -> AnyPublisher<User, Error> in
let requestURL = URL(string: "https://jsonplaceholder.typicode.com/users/(userId)")!
return URLSession.shared.dataTaskPublisher(for: requestURL)
.map { $0.data }
.decode(type: User.self, decoder: JSONDecoder())
.eraseToAnyPublisher()
}
return Publishers.MergeMany(tasks)
}.collect().sink(receiveCompletion: { (completion) in
if case .failure(let error) = completion {
print("Got error: (error.localizedDescription)")
}
}) { (allUsers) in
print("Got users:")
allUsers.map { print("($0)") }
}

在上面的示例中,我使用collect来收集所有结果,这会推迟向Sink发送值,直到所有网络请求都成功完成,但是您可以去掉collect,并在网络请求完成时逐个接收上面示例中的每个User

最新更新