RxSwift-确定Observable是否已被处理



我正在尝试获取向其客户端Consumer出售ObservablesPublisher,以确定其消费者何时处置了其Observable

令人讨厌。我的代码运行良好,直到我从Consumer代码中删除了RxSwift.debug

有没有其他方法可以让它发挥作用?

private class Subscriber {
var ids: [Int]
// This property exists so I can watch whether the observable has 
// gone nil (which I though would happen when its been disposed, but it 
// seems to happen immediately)
weak var observable: Observable<[Updates]>?
}
class Publisher {
private let relay: BehaviorRelay<[Int: Updates]> 
private var subscribers: [Subscriber] = []
func updatesStream(for ids: [Int]) -> Observable<[Updates]> {
let observable = relay
.map { map in
return map
.filter { ids.contains($0.key) }
.map { $0.value }
}
.filter { !$0.isEmpty }
.asObservable()
let subscriber = Subscriber(ids: ids, observable: observable)
subscribers.append(subscriber)
return observable
}
private func repeatTimer() {
let updates: [Updates] = ....
// I need to be able to determine at this point whether the subscriber's 
// observable has been disposed of, so I can remove it from the list of
// subscribers. However `subscriber.observable` is always nil here.
// PS: I am happy for this to happen before the `repeatTimer` func fires
subscribers.remove(where: { subscriber in
return subscriber.observable == nil
})
relay.accept(updates)
}
}

class Client {
private var disposeBag: DisposeBag?
private let publisher = Publisher()
func startWatching() {
let disposeBag = DisposeBag()
self.disposeBag = disposeBag
publisher
// with the `.debug` below things work OK, without it the 
///`Publisher.Subscriber.observable` immediately becomes nil
//.debug("Consumer") 
.updatesStream(for: [1, 2, 3])
.subscribe(onNext: { values in
print(values)
})
.disposed(by: disposeBag)
}
func stopWatching() {
disposeBag = nil
}
}

我认为这是一个非常糟糕的主意,但它解决了请求的问题。。。如果我必须把这个代码放在我的一个项目中,我会非常担心比赛条件。。。

struct Subscriber {
let ids: [Int]
var subscribeCount: Int = 0
let lock = NSRecursiveLock()
}
class Publisher {
private let relay = BehaviorRelay<[Int: Updates]>(value: [:])
private var subscribers: [Subscriber] = []
func updatesStream(for ids: [Int]) -> Observable<[Updates]> {
var subscriber = Subscriber(ids: ids)
let observable = relay
.map { map in
return map
.filter { ids.contains($0.key) }
.map { $0.value }
}
.filter { !$0.isEmpty }
.do(
onSubscribe: {
subscriber.lock.lock()
subscriber.subscribeCount += 1
subscriber.lock.unlock()
},
onDispose: {
subscriber.lock.lock()
subscriber.subscribeCount -= 1
subscriber.lock.unlock()
})
.asObservable()
subscribers.append(subscriber)
return observable
}
private func repeatTimer() {
subscribers.removeAll(where: { subscriber in
subscriber.subscribeCount == 0
})
}
}