我正在尝试将一些组合友好代码转换为async/await。在这篇文章中,我将使用一个从真实代码中截取的简单示例,以保持讨论的简单性。
import Combine
import Foundation
enum State {
case a
case b
case c
}
class StateMachine: Publisher {
typealias Output = State
typealias Failure = Error
private let currentState = CurrentValueSubject<State, Error>(State.a)
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
currentState.receive(subscriber: subscriber)
}
func gotoB() { currentState.value = .b }
func gotoC() { currentState.value = .c }
}
let sm = StateMachine()
let c = sm.sink {
print("Failed with ($0)")
}
receiveValue: {
print("New value ($0)")
}
sm.gotoB()
sm.gotoC()
因此,在这段代码中(在游乐场中工作),我希望StateMachine
充当发布者,以便它可以直接订阅。为了实现这一点,我使用内部CurrentValueSubject
并将发布者的receive(subscriber:)
转发给它。
但是当我将class StateMachine:
更改为actor StateMachine:
actor StateMachine: Publisher {
//...
}
// ...
await sm.gotoB()
await.sm.gotoC()
此代码不再编译并抛出以下错误:
expression failed to parse:
error: SwiftFormat.playground:18:10: error: actor-isolated instance method 'receive(subscriber:)' cannot be used to satisfy nonisolated protocol requirement
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
^
SwiftFormat.playground:18:10: note: add 'nonisolated' to 'receive(subscriber:)' to make this instance method not isolated to the actor
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
^
nonisolated
Combine.Publisher:5:10: note: 'receive(subscriber:)' declared here
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
^
现在我可以按建议添加nonisolated
,但我不确定这不会破坏actor
引入的并发处理,因为没有什么可以阻止并发访问内部主题。
所以有一种方法来保持类型作为一个组合Publisher
或有一些其他异步/离开的方法,我只是没有读到现在是首选的方法?
首先要问的是"什么是异步的?">
我不认为在这种情况下gotoA
或gotoB
是异步的。异步的是对当前值的订阅。
消费者将订阅它,然后在稍后的某个时间点,它将更改,他们将更新。
在async await中,这可以建模为AsyncStream
。
你可以这样访问一个异步流:
for await state in stateMachine.values {
print("received value (state)")
}
因此,为了使此工作,我们需要使stateMachine
给出AsyncStream
。
这样就可以了…
enum State {
case a
case b
case c
}
class StateMachine {
var subscriptions: [UUID: AsyncStream<State>.Continuation] = [:] // 1.
var currentState = State.a {
didSet {
subscriptions.allValues.forEach {
$0.yield(currentState)
}
}
} // 2.
var values: AsyncStream<State> {
AsyncStream { continuation in
let id = UUID()
subscriptions[id] = continuation
continuation.yield(currentValue)
continuation.onTermination = { [weak self] _ in
self?.subscriptions.removeValue(forKey: id)
}
}
} // 3.
func gotoB() {
currentState = .b
}
}
在此,我们…
- 创建一个位置来存储任何传入的订阅
- 确保当当前状态改变时,我们更新所有订阅者
- 当一个新的订阅者被添加时,我们给它一个AsyncStream来监听
这是您需要创建的内容的开始。如果你的应用程序中有多个地方需要订阅相同的状态,你可能需要使用某种Singleton
访问。
但这应该给你一个初步的想法。我们在我目前正在开发的应用程序中使用了与此非常相似的东西。有时是为了创建可订阅缓存。有时是为了响应错误消息等…