我有一个第三方库,在那里你可以订阅一些主题以接收该话题的更新(带回调(。但问题是,当您再次调用具有相同主题的subscribe方法时,以前的订阅会被新的订阅自动替换。
现在,我想将回调替换为Flow
。所以我写了以下代码:
val flow: Flow<Message> = callbackFlow {
lib.subscribe(topic) { message -> offer(message) }
awaitClose { lib.unsubscribe(topic) }
}
但当我多次呼叫collect
时,只有最后一次收到消息。
那么我该如何做到这一点:
- 对同一主题有多个订阅
- 当最后一个订阅者关闭/取消订阅时,取消订阅给定的主题
我已经查看了BroadcastChannel
,但找不到任何东西来解决第二个需求。我刚刚开始探索Flow
,所以可能有一些简单的包装器我只是错过了。
如果您将Flow
转换为SharedFlow
(当然,您仍然可以将其公开为更抽象的Flow
类型(,则将添加后续订阅者,而无需重新执行传递给callbackFlow()
的块,这意味着库的原始订阅者不会被替换。
Kotlin Flow库提供了Flow<T>.shareIn(): SharedFlow<T>
扩展,这使得转换为SharedFlow
非常容易。
这个答案还有另一个重要部分。当您调用shareIn()
时,您需要将SharingStarted.WhileSubscribed
作为参数之一传递。这在您的用例中是至关重要的,因为如果没有这一点,awaitClose()
可能不会被调用,这意味着您将不会从库中取消订阅。对于WhileSubscribed
,完全取消订阅SharedFlow
也将导致取消订阅原始Flow
,这将触发对awaitClose()
的调用。
您的代码可能如下所示:
val flow: Flow<Message> = callbackFlow {
lib.subscribe(topic) { message -> offer(message) }
awaitClose { lib.unsubscribe(topic) }
}.shareIn(coroutineScope, SharingStarted.WhileSubscribed(), replay = 0)
来自shareIn()
上的文档:
WhileSubscribed((--当第一个订阅者出现时启动上游流,当最后一个订阅者消失时立即停止。。。