在联合发布者链中,如何在取消或完成之前保持内部对象的活动状态?



我创建了一个联合发布者链,看起来像这样:

let pub = getSomeAsyncData()
.mapError { ... }
.map { ... }
...
.flatMap { data in 
let wsi = WebSocketInteraction(data, ...)
return wsi.subject
}
.share().eraseToAnyPublisher()

它是不同可能的网络请求和数据转换的流。调用代码想要订阅pub以了解整个异步过程是成功还是失败。

我对flatMap步骤的设计与WebSocketInteraction感到困惑.这是我编写的一个帮助程序类。我不认为它的内部细节很重要,但它的目的是提供其subject属性(PassthroughSubject(作为链中的下一个发布者。在内部,WebSocketInteraction使用URLSessionWebSocketTask,与服务器通信,并发布到subject。我喜欢flatMap,但是您如何在发布者链的生命周期内保持这件作品的活力?

如果我将其存储在外部对象中(没问题(,那么我需要清理它。我可以在subject完成时执行此操作,但如果调用方取消整个发布者链,则我不会收到完成事件。我是否需要使用Publisher.handleEvents并侦听取消?这似乎有点丑陋。但也许没有其他办法...

.flatMap { data in 
let wsi = WebSocketInteraction(data, ...)
self.currentWsi = wsi  // store in containing object to keep it alive.
wsi.subject.sink(receiveCompletion: { self.currentWsi = nil })
wsi.subject.handleEvents(receiveCancel: {
wsi.closeWebSocket()
self.currentWsi = nil
})

有人在这里有什么好的"设计模式"吗?

我考虑过的一个设计是制作我自己的Publisher。例如,它不是让WebSocketInteraction出售PassthroughSubject,而是可以符合Publisher。我最终可能会这样做,但制作自定义组合Publisher需要更多的工作,并且文档引导人们使用主题。若要创建自定义发布服务器,您必须实现PassthroughSubject为您执行的一些操作,例如响应需求和取消,并保持状态以确保您最多完成一次,并且在此之后不会发送事件。

[编辑:澄清WebSocketInteraction是我自己的班级。

目前还不清楚你在保持内在物体存活时遇到了什么问题。只要某物有很强的参照物,该对象就应该是活的。

它要么是将启动某个异步进程的外部对象,要么是通过self.subject.send(...)保持对self的强引用的内部闭包。

class WebSocketInteraction {
private let subject = PassthroughSubject<String, Error>()
private var isCancelled: Bool = false
init() {
// start some async work
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
if !isCancelled { self.subject.send("Done") } // <-- ref
} 
}
// return a publisher that can cancel the operation when
var pub: AnyPublisher<String, Error> {
subject
.handleEvents(receiveCancel: {
print("cancel handler")
self.isCancelled = true  // <-- ref
})
.eraseToAnyPublisher()
}
}

您应该能够根据需要使用它flatMap,因为pub属性返回了 publisher,并且内部闭包包含对self的引用

let pub = getSomeAsyncData()
...
.flatMap { data in 
let wsi = WebSocketInteraction(data, ...)
return wsi.pub
}

相关内容

最新更新