我订阅了Combine发布者,但经常在.sink
中调用并发任务。有更方便的方法吗?
import _Concurrency
import Combine
import Foundation
import PlaygroundSupport
var cancellable = Set<AnyCancellable>()
struct MyService {
private static let subject = PassthroughSubject<String, Never>()
init() {
Task {
try? await Task.sleep(until: .now + .seconds(2), clock: .suspending)
Self.subject.send("Publisher: " + Date.now.formatted())
}
}
func publisher() -> AnyPublisher<String, Never> {
Self.subject.eraseToAnyPublisher()
}
}
class MyClass {
let service: MyService
var cancellable: AnyCancellable?
init() {
service = MyService()
subscribe()
}
func subscribe() {
// HERE ===>
cancellable = service.publisher()
.sink { value in Task { [weak self] in await self?.doThings(value: value) } }
}
func doThings(value: String) async {
print(value)
try? await Task.sleep(until: .now + .seconds(2), clock: .suspending)
print("Things done!")
}
}
let test = MyClass()
PlaygroundPage.current.needsIndefiniteExecution = true
我想做的是无缝地调用.sink
中的并发任务,比如:
// From:
service.publisher()
.sink { value in Task { [weak self] in await self?.doThings(value: value) }
// To:
service.publisher()
.sink { [weak self] value in await self?.doThings(value: value) }
您可以编写这样的扩展:
extension Publisher where Failure == Never {
func sinkAsync(receiveValue: @escaping ((Self.Output) async throws -> Void)) -> AnyCancellable {
sink { value in
Task {
try await receiveValue(value)
}
}
}
}
注意,它使用的闭包是async throws
,这与Task.init
的闭包相同。
";"本地";我认为,这样做的方法是将发布者转换为异步序列。
Task {
for await value in service.publisher().values {
await self.doThings(value: value)
}
}
请注意,通过这样做,您不需要管理可取消的!