我创建了一个联合发布者链,看起来像这样:
let pub = getSomeAsyncData()
.mapError { ... }
.map { ... }
...
.flatMap { data in
let wsi = WebSocketInteraction(data, ...)
return wsi.subject
}
.share().eraseToAnyPublisher()
它是不同可能的网络请求和数据转换的流。调用代码想要订阅pub
以了解整个异步过程是成功还是失败。
我对flatMap
步骤的设计与WebSocketInteraction
感到困惑.这是我编写的一个帮助程序类。我不认为它的内部细节很重要,但它的目的是提供其subject
属性(PassthroughSubject
(作为链中的下一个发布者。在内部,WebSocketInteraction
使用URLSessionWebSocketTask
,与服务器通信,并发布到subject
。我喜欢flatMap
,但是您如何在发布者链的生命周期内保持这件作品的活力?
如果我将其存储在外部对象中(没问题(,那么我需要清理它。我可以在subject
完成时执行此操作,但如果调用方取消整个发布者链,则我不会收到完成事件。我是否需要使用Publisher.handleEvents
并侦听取消?这似乎有点丑陋。但也许没有其他办法...
.flatMap { data in
let wsi = WebSocketInteraction(data, ...)
self.currentWsi = wsi // store in containing object to keep it alive.
wsi.subject.sink(receiveCompletion: { self.currentWsi = nil })
wsi.subject.handleEvents(receiveCancel: {
wsi.closeWebSocket()
self.currentWsi = nil
})
有人在这里有什么好的"设计模式"吗?
我考虑过的一个设计是制作我自己的Publisher
。例如,它不是让WebSocketInteraction
出售PassthroughSubject
,而是可以符合Publisher
。我最终可能会这样做,但制作自定义组合Publisher
需要更多的工作,并且文档引导人们使用主题。若要创建自定义发布服务器,您必须实现PassthroughSubject
为您执行的一些操作,例如响应需求和取消,并保持状态以确保您最多完成一次,并且在此之后不会发送事件。
[编辑:澄清WebSocketInteraction
是我自己的班级。
目前还不清楚你在保持内在物体存活时遇到了什么问题。只要某物有很强的参照物,该对象就应该是活的。
它要么是将启动某个异步进程的外部对象,要么是通过self.subject.send(...)
保持对self
的强引用的内部闭包。
class WebSocketInteraction {
private let subject = PassthroughSubject<String, Error>()
private var isCancelled: Bool = false
init() {
// start some async work
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
if !isCancelled { self.subject.send("Done") } // <-- ref
}
}
// return a publisher that can cancel the operation when
var pub: AnyPublisher<String, Error> {
subject
.handleEvents(receiveCancel: {
print("cancel handler")
self.isCancelled = true // <-- ref
})
.eraseToAnyPublisher()
}
}
您应该能够根据需要使用它flatMap
,因为pub
属性返回了 publisher,并且内部闭包包含对self
的引用
let pub = getSomeAsyncData()
...
.flatMap { data in
let wsi = WebSocketInteraction(data, ...)
return wsi.pub
}