在后台线程中执行合并未来不起作用



如果你在操场上运行这个:

import Combine
import Foundation
struct User {
let name: String
}
var didAlreadyImportUsers = false
var importUsers: Future<Bool, Never> {
Future { promise in
sleep(5)
promise(.success(true))
}
}
var fetchUsers: Future<[User], Error> {
Future { promise in
promise(.success([User(name: "John"), User(name: "Jack")]))
}
}
var users: AnyPublisher<[User], Error> {
if didAlreadyImportUsers {
return fetchUsers
.receive(on: DispatchQueue.global(qos: .userInitiated))
.eraseToAnyPublisher()
} else {
return importUsers
.receive(on: DispatchQueue.global(qos: .userInitiated))
.setFailureType(to: Error.self)
.combineLatest(fetchUsers)
.map { $0.1 }
.eraseToAnyPublisher()
}
}
users
.receive(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
})
print("run")

输出将是:

[User(name: "John"), User(name: "Jack")]
run
finished

但我期待得到:

run
[User(name: "John"), User(name: "Jack")]
finished

因为接收器应在后台线程中运行代码。我在这里错过了什么。 我需要输入代码吗:

sleep(5)
promise(.success(true))

在后台线程中? 那么的目的是什么

.receive(on: DispatchQueue.global(qos: .userInitiated))

您的未来在创建后立即运行,因此在您的情况下,一旦访问此属性:

var importUsers: Future<Bool, Never> {
Future { promise in
sleep(5)
promise(.success(true))
}
}

由于Future会立即运行,这意味着传递给 promise 的闭包会立即执行,使主线程在继续之前休眠 5 秒。在您的情况下,一旦您访问users,就会创建Future,这是在主线程上完成的。

receive(on:影响sink(或下游发布者(接收值的线程,而不是创建值的位置。由于期货在您调用.sink时已经完成,因此完成和发出的价值会立即交付给sink。在后台队列上,但仍然立即。

在那之后,你终于打到了print("run")线。

如果将sleep(5)位替换为以下内容:

var importUsers: Future<Bool, Never> {
Future { promise in
DispatchQueue.global().asyncAfter(deadline: .now() + 5) {
promise(.success(true))
}
}
}

并对您的订阅代码进行一些小的调整:

import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
var cancellables = Set<AnyCancellable>()
users
.receive(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
}).store(in: &cancellables)

您将看到输出按预期打印,因为初始未来不会阻塞主线程五秒钟。

或者,如果您保持睡眠状态并像这样订阅,您将看到相同的输出:

import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
var cancellables = Set<AnyCancellable>()
users
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
}).store(in: &cancellables)

原因是您在后台线程上subscribe,因此订阅和所有内容都是在主线程异步设置的,这会导致print("run")在收到Future的结果之前运行。但是,一旦访问users属性(位于主线程上(,主线程仍然会休眠 5 秒,因为那是您初始化Future的时候。因此,整个输出都是一次打印的,而不是在"run"后休眠 5 秒。

有一种方便的方法可以达到预期。Combine 有一个延迟发布者,它会等到subscribe(on:)成为接收者。所以代码应该是这样的

var fetchUsers: Future<[User], Error> {
return Deferred {
Future { promise in
sleep(5)
promise(.success([User(name: "John"), User(name: "Jack")]))
}
}.eraseToAnyPublisher()
}
var cancellables = Set<AnyCancellable>()
fetchUsers
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
}).store(in: &cancellables)
print("run")

这样的代码不会停止主队列,输出将按预期

run
[User(name: "John"), User(name: "Jack")]
finished

相关内容

最新更新