Swift并发等价于promise-resolver对是什么



使用PromiseKit库,可以一起创建promise和解析器函数,并将它们存储在类的实例上:

class ExampleClass {
    // Promise and resolver for the top news headline, as obtained from
    // some web service.
    private let (headlinePromise, headlineSeal) = Promise<String>.pending()
}

就像任何承诺一样,一旦价值可用,我们可以连锁headlinePromise来做一些工作:

headlinePromise.get { headline in
    updateUI(headline: headline)
}
// Some other stuff here

由于承诺尚未得到解决,get闭包的内容将在某个地方排队,控制将立即转移到"此处的其他内容"部分;除非承诺得到解决,否则不会调用updateUI

为了解决promise,实例方法可以调用headlineSeal:

makeNetworkRequest("https://news.example/headline").get { headline in
    headlineSeal.fulfill(headline)
}

承诺现在已经解决,任何一直在等待headlinePromise的承诺链都将继续。在这个ExampleClass实例的剩余生命中,任何像一样启动的承诺链

headlinePromise.get { headline in
    // ...
}

将立即开始执行。("立即"可能意味着"现在,同步",也可能意味着是"在下一次运行事件循环时";这一区别在这里对我来说并不重要。(由于promise只能解决一次,因此将来对headlineSeal.fulfill(_:)headlineSeal.reject(_:)的任何调用都将是无操作的。

问题

如何将这种模式惯用地转换为Swift并发("async/await"(?有一个被称为"promise"的对象和一个被称作"resolver"的函数并不重要;我要找的是一个具有以下属性的设置:

  1. 有些代码可以声明对某个异步可用状态的依赖,并在该状态可用之前屈服
  2. 该状态可能通过任何实例方法"实现">
  3. 一旦该状态可用,任何依赖于该状态的未来代码链都可以立即运行
  4. 一旦状态可用,其值就不可变;该状态不能再次变为不可用,也不能更改其值

我认为其中一些可以通过存储实例变量来实现

private let headlineTask: Task<String, Error>

然后等待的值

let headline = try await headlineTask.value

但我不确定Task应该如何初始化,或者应该如何"实现">

这里有一种复制Promise的方法,它可以由多个消费者等待,并由任何同步代码实现:

public final class Promise<Success: Sendable>: Sendable {
    typealias Waiter = CheckedContinuation<Success, Never>
    
    struct State {
        var waiters = [Waiter]()
        var result: Success? = nil
    }
    
    private let state = ManagedCriticalState(State())
    
    public init(_ elementType: Success.Type = Success.self) { }
    
    @discardableResult
    public func fulfill(with value: Success) -> Bool {
        return state.withCriticalRegion { state in
            if state.result == nil {
                state.result = value
                for waiters in state.waiters {
                    waiters.resume(returning: value)
                }
                state.waiters.removeAll()
                return false
            }
            return true
        }
    }
    
    public var value: Success {
        get async {
            await withCheckedContinuation { continuation in
                state.withCriticalRegion { state in
                    if let result = state.result {
                        continuation.resume(returning: result)
                    } else {
                        state.waiters.append(continuation)
                    }
                }
            }
        }
    }
}
extension Promise where Success == Void {
    func fulfill() -> Bool {
        return fulfill(with: ())
    }
}

ManagedCriticalState类型可以从SwiftAsyncAlgorithms包中找到。

我认为我得到了安全和正确的实现,但如果有人发现错误,我会更新答案。作为参考,我受到AsyncChannel和这篇博客文章的启发。

你可以这样使用它:

@main
enum App {
    static func main() async throws {
        let promise = Promise(String.self)
        
        // Delayed fulfilling.
        let fulfiller = Task.detached {
            print("Starting to wait...")
            try await Task.sleep(nanoseconds: 2_000_000_000)
            print("Promise fulfilled")
            promise.fulfill(with: "Done!")
        }
        
        let consumer = Task.detached {
            await (print("Promise resolved to '(promise.value)'"))
        }
        
        // Launch concurrent consumer and producer
        // and wait for them to complete.
        try await fulfiller.value
        await consumer.value
        
        // A promise can be fulfilled only once and
        // subsequent calls to `.value` immediatly return
        // with the previously resolved value.
        promise.fulfill(with: "Ooops")
        await (print("Promise still resolved to '(promise.value)'"))
    }
}

简短解释

在Swift Concurrency中,高级Task类型类似于Future/Promise(可以等待并暂停执行,直到解决为止(,但实际的解决方案无法从外部控制:必须组成内置的较低级别异步函数,如URLSession.data()Task.sleep()

然而,Swift Concurrency提供了一个(Checked|Unsafe)Continuation类型,它基本上充当Promise解析器。它是一个低级别的构建块,目的是将常规异步代码(例如基于回调的代码(迁移到Swift并发世界。

在上面的代码中,continuation由使用者创建(通过.value属性(并存储在Promise中。稍后,当结果可用时,完成存储的延续(使用.resume()(,从而恢复消费者的执行。结果也被缓存,这样,如果在调用.value时它已经可用,它将直接返回给被调用方。

当多次满足Promise时,当前行为是忽略后续调用并返回一个布尔值,指示是否已经满足Promise。可以使用其他API(陷阱、抛出错误等(

必须保护Promise的内部可变状态不受并发访问的影响,因为多个并发域可以同时尝试从中读取和写入。这可以通过常规锁定来实现(不过,我相信这可以通过actor来实现(。