如何用多个流包装单个回调



我有一个第三方库,在那里你可以订阅一些主题以接收该话题的更新(带回调(。但问题是,当您再次调用具有相同主题的subscribe方法时,以前的订阅会被新的订阅自动替换。

现在,我想将回调替换为Flow。所以我写了以下代码:

val flow: Flow<Message> = callbackFlow {
lib.subscribe(topic) { message -> offer(message) }
awaitClose { lib.unsubscribe(topic) }
}

但当我多次呼叫collect时,只有最后一次收到消息。

那么我该如何做到这一点:

  • 对同一主题有多个订阅
  • 当最后一个订阅者关闭/取消订阅时,取消订阅给定的主题

我已经查看了BroadcastChannel,但找不到任何东西来解决第二个需求。我刚刚开始探索Flow,所以可能有一些简单的包装器我只是错过了。

如果您将Flow转换为SharedFlow(当然,您仍然可以将其公开为更抽象的Flow类型(,则将添加后续订阅者,而无需重新执行传递给callbackFlow()的块,这意味着库的原始订阅者不会被替换。

Kotlin Flow库提供了Flow<T>.shareIn(): SharedFlow<T>扩展,这使得转换为SharedFlow非常容易。

这个答案还有另一个重要部分。当您调用shareIn()时,您需要将SharingStarted.WhileSubscribed作为参数之一传递。这在您的用例中是至关重要的,因为如果没有这一点,awaitClose()可能不会被调用,这意味着您将不会从库中取消订阅。对于WhileSubscribed,完全取消订阅SharedFlow也将导致取消订阅原始Flow,这将触发对awaitClose()的调用。

您的代码可能如下所示:

val flow: Flow<Message> = callbackFlow {
lib.subscribe(topic) { message -> offer(message) }
awaitClose { lib.unsubscribe(topic) }
}.shareIn(coroutineScope, SharingStarted.WhileSubscribed(), replay = 0)

来自shareIn()上的文档:

WhileSubscribed((--当第一个订阅者出现时启动上游流,当最后一个订阅者消失时立即停止。。。

最新更新