我有一个名为ProcessorStack
的对象,它包含零个或多个Processor
子项目。ProcessorStack
和单个Processor
对象各自只有一个方法:
process(input: Value) -> Future<Value, Never>
我希望返回值是Future
而不是AnyPublisher
,以清楚地表明调用者应该只期望发出单个结果。其他对象只能访问ProcessorStack
,而不能访问它的Processor
子对象。下面是我希望发生的:
- 对象调用
ProcessorStack
:stack.process(value: someValue).sink { result in // Do something with the result }.store(in: &subscriptions)
ProcessorStack
使用reduce操作将所有子Processor
对象链在一起,并通过Future
返回最终结果:func process(value: Value) -> Future<Value, Never> { guard !childProcessors.isEmpty else { return Future { $0(.success(value)) } } let just = Just(value).eraseToAnyPublisher() childProcessors.reduce(just) { (publisher, processor) -> AnyPublisher<Value, Never> in publisher.flatMap { processor.process(value: $0).eraseToAnyPublisher() } } // Here's where I'm lost. }
我不能为我的生活弄清楚如何执行异步减少链,然后返回结果作为Future
。如果我将整个reduce操作包装在Future
初始化器中,我就会留下一个AnyPublisher<Value, Never>
,我必须以某种方式执行它,然后将结果传递给Future
的完成闭包。我不能在Future
的闭包中sink
它,因为我必须抓住从该闭包返回的可取消对象,否则整个过程立即停止。我不能将结果FlatMap到Future,因为它的类型是FlatMap<AnyPublisher<Value, Never>, Future<Value, Never>>
。如果我只使外部返回类型为AnyPublisher<Value, Never>
,我就可以完成所有这些,但我真的希望订阅者具有Future
语义。
你想做的事情的前提是错误的。虽然Future
最多只能返回一个结果,但在语义上并不代表这一点。它只是一个特定类型的发布者。
你应该返回一个AnyPublisher
在函数边界,而不是试图避免它,这将使你的代码更健壮的变化(例如,如果你需要包装Future
在Deferred
?——常用做法)
process(value: Value) -> AnyPublisher<Value, Never> {
...
}
如果订阅者只能处理单个结果,他们可以简单地确保使用first()
:
process(value)
.first()
.sink {...}
.store(in: &storage)
但是如果你坚持,你可以在Future
的闭包中使用.sink
,如果闭包捕获了对AnyCancellable
的引用并在完成时释放它:
process(value: Value) -> Future<Value, Never> {
guard !childProcessors.isEmpty else {
return Future { $0(.success(value)) }
}
let just = Just(value).eraseToAnyPublisher()
let combined = childProcessors.reduce(just) { (publisher, processor) -> AnyPublisher<Value, Never> in
publisher.flatMap { processor.process(value: $0).eraseToAnyPublisher() }
}
var c: AnyCancellable? = nil
return Future { promise in
c = combined.sink(receiveCompletion: {
withExtendedLifetime(c){}; c = nil
}) {
promise(.success($0))
}
}
}