Combine接收器内调用并发的捷径



我订阅了Combine发布者,但经常在.sink中调用并发任务。有更方便的方法吗?

import _Concurrency
import Combine
import Foundation
import PlaygroundSupport
var cancellable = Set<AnyCancellable>()
struct MyService {
private static let subject = PassthroughSubject<String, Never>()
init() {
Task {
try? await Task.sleep(until: .now + .seconds(2), clock: .suspending)
Self.subject.send("Publisher: " + Date.now.formatted())
}
}
func publisher() -> AnyPublisher<String, Never> {
Self.subject.eraseToAnyPublisher()
}
}
class MyClass {
let service: MyService
var cancellable: AnyCancellable?
init() {
service = MyService()
subscribe()
}
func subscribe() {
// HERE ===>
cancellable = service.publisher()
.sink { value in Task { [weak self] in await self?.doThings(value: value) } }
}
func doThings(value: String) async {
print(value)
try? await Task.sleep(until: .now + .seconds(2), clock: .suspending)
print("Things done!")
}
}

let test = MyClass()

PlaygroundPage.current.needsIndefiniteExecution = true

我想做的是无缝地调用.sink中的并发任务,比如:

// From:
service.publisher()
.sink { value in Task { [weak self] in await self?.doThings(value: value) }
// To:
service.publisher()
.sink { [weak self] value in await self?.doThings(value: value) }

您可以编写这样的扩展:

extension Publisher where Failure == Never {
func sinkAsync(receiveValue: @escaping ((Self.Output) async throws -> Void)) -> AnyCancellable {
sink { value in
Task {
try await receiveValue(value)
}
}
}
}

注意,它使用的闭包是async throws,这与Task.init的闭包相同。

";"本地";我认为,这样做的方法是将发布者转换为异步序列。

Task {
for await value in service.publisher().values {
await self.doThings(value: value)
}
}

请注意,通过这样做,您不需要管理可取消的!

最新更新