如何使用Combine+Swift复制PromiseKit风格的链式异步流



我在一个项目中成功地使用了PromiseKit,直到Xcode 11测试版破坏了PK v7。为了减少外部依赖,我决定废弃PromiseKit。处理链式异步代码的最佳替代方案似乎是使用新的Combine框架的Futures。

我正在努力使用Combine 复制简单的PK语法

例如。简单PromiseKit链式异步调用语法

getAccessCodeFromSyncProvider.then{accessCode in startSync(accessCode)}.then{popToRootViewController}.catch{handleError(error)}

我理解:

异步/等待的Swift标准库实现将解决这个问题(异步/等待还不存在,尽管Chris Latter自己也参与了很多讨论)

我可以使用信号载体(容易出错?)进行复制

<blockquote\>flatMap可用于链接期货

我想要的异步代码应该能够按需调用,因为它涉及到确保用户登录。我正在努力解决两个概念问题。

  1. 如果我将Futures封装在一个方法中,用sink来处理结果,那么在sink调用订阅者之前,该方法似乎超出了范围。

  2. 由于Futures只执行一次,我担心如果我多次调用该方法,我只会从第一次调用中得到旧的、过时的结果。为了解决这个问题,也许我会使用PassthroughSubject?这允许按需调用发布服务器。

问题:

  1. 是否必须保留调用方法
  2. 我如何使用Swift标准库复制简单的链式异步,然后将其嵌入到我可以按需调用的Swift实例方法中,以从顶部重新启动链式异步调用
//how is this done using Combine?
func startSync() {
getAccessCodeFromSyncProvider.then{accessCode in startSync(accessCode)}.catch{\handle error here}
}

这并不是你整个问题的真正答案,只是关于如何开始使用Combine的部分。我将演示如何使用Combine框架链接两个异步操作:

print("start")
Future<Bool,Error> { promise in
delay(3) {
promise(.success(true))
}
}
.handleEvents(receiveOutput: {_ in print("finished 1")})
.flatMap {_ in
Future<Bool,Error> { promise in
delay(3) {
promise(.success(true))
}
}
}
.handleEvents(receiveOutput: {_ in print("finished 2")})
.sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
.store(in:&self.storage) // storage is a persistent Set<AnyCancellable>

首先,关于持久性问题的答案是:最后一个订阅者必须持久化,而实现这一点的方法是使用.store方法。通常,您将有一个Set<AnyCancellable>作为属性,如这里所示,您只需调用.store作为管道中的最后一个东西,即可将您的订户放入其中。

接下来,在这个管道中,我使用.handleEvents只是为了在管道移动时给自己一些打印输出。这些只是诊断,在真正的实现中是不存在的。所有的print语句都纯粹是为了让我们可以谈谈这里发生的事情。

那么会发生什么呢?

start
finished 1 // 3 seconds later
finished 2 // 3 seconds later
done

您可以看到,我们已经将两个异步操作链接起来,每个操作需要3秒钟。

我们是怎么做到的?我们从Future开始,它必须在完成时使用Result作为完成处理程序来调用其传入的promise方法。之后,我们使用.flatMap生成另一个Future并将其投入运行,再次执行相同的操作。

因此,结果并不漂亮(像PromiseKit),但它是一个异步操作链。

在Combine之前,我们可能会使用某种Operation/OperationQueue依赖关系来实现这一点,这会很好地工作,但会比PromiseKit的直接易读性更低。

稍微逼真一点

说了这么多,这里有一个稍微现实一点的重写:

var storage = Set<AnyCancellable>()
func async1(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async1")
promise(.success(true))
}
}
func async2(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async2")
promise(.success(true))
}
}
override func viewDidLoad() {
print("start")
Future<Bool,Error> { promise in
self.async1(promise)
}
.flatMap {_ in
Future<Bool,Error> { promise in
self.async2(promise)
}
}
.sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
.store(in:&self.storage) // storage is a persistent Set<AnyCancellable>
}

正如您所看到的,我们的Future发布者只需传递promise回调;实际上,他们不一定是打电话给他们的人。因此,promise回调可以在任何地方调用,在此之前我们不会继续。

因此,您可以很容易地看到如何用一个真正的异步操作来替换人工delay,该操作以某种方式控制了这个promise回调,并可以在它完成时调用它。此外,我的promise Result类型纯粹是人为的,但您可以再次看到它们是如何用于在管道中传递有意义的东西的。当我说promise(.success(true))时,这会导致true弹出管道的末尾;我们在这里忽略了这一点,但它可能是某种完全有用的价值,甚至可能是下一个未来。

(还要注意,我们可以在链中的任何点插入.receive(on: DispatchQueue.main),以确保在主线程上立即启动接下来的内容。)

稍微整洁一点

我还想到,通过将我们的Future发布者转移到常量中,我们可以使语法更整洁,也许更接近PromiseKit可爱的简单链。不过,如果你这样做了,你可能应该把它们包装在Deferred publishers中,以防止过早评估。例如:

var storage = Set<AnyCancellable>()
func async1(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async1")
promise(.success(true))
}
}
func async2(_ promise:@escaping (Result<Bool,Error>) -> Void) {
delay(3) {
print("async2")
promise(.success(true))
}
}
override func viewDidLoad() {
print("start")
let f1 = Deferred{Future<Bool,Error> { promise in
self.async1(promise)
}}
let f2 = Deferred{Future<Bool,Error> { promise in
self.async2(promise)
}}
// this is now extremely neat-looking
f1.flatMap {_ in f2 }
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
.store(in:&self.storage) // storage is a persistent Set<AnyCancellable>
}

matt的答案是正确的,使用flatMap来连锁promise。我养成了在使用PromiseKit时返回promise的习惯,并将其移植到Combine(返回Futures)中。

我发现它使代码更容易阅读。这是matt的最后一个推荐例子:

var storage = Set<AnyCancellable>()
func async1() -> Future<Bool, Error> {
Future { promise in
delay(3) {
print("async1")
promise(.success(true))
}
}
}
func async2() -> Future<Bool, Error> {
Future { promise in
delay(3) {
print("async2")
promise(.success(true))
}
}
}
override func viewDidLoad() {
print("start")
async1()
.flatMap { _ in async2() }
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: {_ in}, receiveValue: {_ in print("done")})
.store(in:&self.storage) // storage is a persistent Set<AnyCancellable>
}

注意,AnyPublisher也将作为返回值,因此您可以抽象掉Future,并让它返回AnyPublisher<Bool, Error>

func async2() -> AnyPublisher<Bool, Error> {
Future { promise in
delay(3) {
print("async2")
promise(.success(true))
}
}.eraseToAnyPubilsher()
}

此外,如果您想使用类似PromiseKit的语法,以下是Publisher 的一些扩展

我正在使用它在项目中从PromiseKit无缝切换到Combine

extension Publisher {

func then<T: Publisher>(_ closure: @escaping (Output) -> T) -> Publishers.FlatMap<T, Self>
where T.Failure == Self.Failure {
flatMap(closure)
}

func asVoid() -> Future<Void, Error> {
return Future<Void, Error> { promise in
let box = Box()
let cancellable = self.sink { completion in
if case .failure(let error) = completion {
promise(.failure(error))
} else if case .finished = completion {
box.cancellable = nil
}
} receiveValue: { value in
promise(.success(()))
}
box.cancellable = cancellable
}
}

@discardableResult
func done(_ handler: @escaping (Output) -> Void) -> Self {
let box = Box()
let cancellable = self.sink(receiveCompletion: {compl in
if case .finished = compl {
box.cancellable = nil
}
}, receiveValue: {
handler($0)
})
box.cancellable = cancellable
return self
}

@discardableResult
func `catch`(_ handler: @escaping (Failure) -> Void) -> Self {
let box = Box()
let cancellable = self.sink(receiveCompletion: { compl in
if case .failure(let failure) = compl {
handler(failure)
} else if case .finished = compl {
box.cancellable = nil
}
}, receiveValue: { _ in })
box.cancellable = cancellable
return self
}

@discardableResult
func finally(_ handler: @escaping () -> Void) -> Self {
let box = Box()
let cancellable = self.sink(receiveCompletion: { compl in
if case .finished = compl {
handler()
box.cancellable = nil
}
}, receiveValue: { _ in })
box.cancellable = cancellable
return self
}
}
fileprivate class Box {
var cancellable: AnyCancellable?
}

这里有一个使用示例:

func someSync() {
Future<Bool, Error> { promise in
delay(3) {
promise(.success(true))
}
}
.then { result in
Future<String, Error> { promise in
promise(.success("111"))
}
}
.done { string in
print(string)
}
.catch { err in
print(err.localizedDescription)
}
.finally {
print("Finished chain")
}
}

您可以将此框架用于Swift协程,它也可以与Combine-https://github.com/belozierov/SwiftCoroutine

DispatchQueue.main.startCoroutine {
let future: Future<Bool, Error>
let coFuture = future.subscribeCoFuture()
let bool = try coFuture.await()
}

最新更新