Swift将Publisher类型合并为async/await



我正在尝试将一些组合友好代码转换为async/await。在这篇文章中,我将使用一个从真实代码中截取的简单示例,以保持讨论的简单性。


import Combine
import Foundation
enum State {
case a
case b
case c
}
class StateMachine: Publisher {
typealias Output = State
typealias Failure = Error
private let currentState = CurrentValueSubject<State, Error>(State.a)
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
currentState.receive(subscriber: subscriber)
}
func gotoB() { currentState.value = .b }
func gotoC() { currentState.value = .c }
}
let sm = StateMachine()
let c = sm.sink {
print("Failed with ($0)")
}
receiveValue: {
print("New value ($0)")
}
sm.gotoB()
sm.gotoC()

因此,在这段代码中(在游乐场中工作),我希望StateMachine充当发布者,以便它可以直接订阅。为了实现这一点,我使用内部CurrentValueSubject并将发布者的receive(subscriber:)转发给它。

但是当我将class StateMachine:更改为actor StateMachine:

actor StateMachine: Publisher {
//...
}
// ...
await sm.gotoB()
await.sm.gotoC()

此代码不再编译并抛出以下错误:

expression failed to parse:
error: SwiftFormat.playground:18:10: error: actor-isolated instance method 'receive(subscriber:)' cannot be used to satisfy nonisolated protocol requirement
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
^
SwiftFormat.playground:18:10: note: add 'nonisolated' to 'receive(subscriber:)' to make this instance method not isolated to the actor
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
^
nonisolated 
Combine.Publisher:5:10: note: 'receive(subscriber:)' declared here
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
^

现在我可以按建议添加nonisolated,但我不确定这不会破坏actor引入的并发处理,因为没有什么可以阻止并发访问内部主题。

所以有一种方法来保持类型作为一个组合Publisher或有一些其他异步/离开的方法,我只是没有读到现在是首选的方法?

首先要问的是"什么是异步的?">

我不认为在这种情况下gotoAgotoB是异步的。异步的是对当前值的订阅。

消费者将订阅它,然后在稍后的某个时间点,它将更改,他们将更新。

在async await中,这可以建模为AsyncStream

你可以这样访问一个异步流:

for await state in stateMachine.values {
print("received value (state)")
}

因此,为了使此工作,我们需要使stateMachine给出AsyncStream

这样就可以了…

enum State {
case a
case b
case c
}
class StateMachine {
var subscriptions: [UUID: AsyncStream<State>.Continuation] = [:] // 1.
var currentState = State.a {
didSet {
subscriptions.allValues.forEach {
$0.yield(currentState)
}
}
} // 2.
var values: AsyncStream<State> {
AsyncStream { continuation in
let id = UUID()
subscriptions[id] = continuation
continuation.yield(currentValue)
continuation.onTermination = { [weak self] _ in
self?.subscriptions.removeValue(forKey: id)
}
}
} // 3.
func gotoB() {
currentState = .b
}
}

在此,我们…

  1. 创建一个位置来存储任何传入的订阅
  2. 确保当当前状态改变时,我们更新所有订阅者
  3. 当一个新的订阅者被添加时,我们给它一个AsyncStream来监听

这是您需要创建的内容的开始。如果你的应用程序中有多个地方需要订阅相同的状态,你可能需要使用某种Singleton访问。

但这应该给你一个初步的想法。我们在我目前正在开发的应用程序中使用了与此非常相似的东西。有时是为了创建可订阅缓存。有时是为了响应错误消息等…