暂停协程,直到条件为真



我有一个用例,我需要连接和断开与充当服务的类的连接。仅当服务已连接时,才能对服务执行操作。当服务连接或断开连接时,客户端会收到回调通知:

class Service {
    constructor(callback: ConnectionCallback) { ... }
    fun connect() {
        // Call callback.onConnected() some time after this method returns.
    }
    fun disconnect() {
        // Call callback.onConnectionSuspended() some time after this method returns.
    }
    fun isConnected(): Boolean { ... }
    fun performAction(actionName: String, callback: ActionCallback) {
        // Perform a given action on the service, failing with a fatal exception if called when the service is not connected.
    }
    interface ConnectionCallback {
        fun onConnected() // May be called multiple times
        fun onConnectionSuspended() // May be called multiple times
        fun onConnectionFailed()
    }
}

我想使用 Kotlin 协程为该Service类(我无法控制)编写一个包装器。这是ServiceWrapper的骨架:

class ServiceWrapper {
    private val service = Service(object : ConnectionCallback { ... })
    fun connect() {
        service.connect()
    }
    fun disconnect() {
        service.disconnect()
    }
    suspend fun performActionWhenConnected(actionName: String): ActionResult {
        suspendUntilConnected()
        return suspendCoroutine { continuation ->
            service.performAction(actionName, object : ActionCallback() {
                override fun onSuccess(result: ActionResult) {
                    continuation.resume(result)
                }
                override fun onError() {
                    continuation.resumeWithException(RuntimeException())
                }
            }
        }
    }
}

如何使用协程实现此suspendUntilConnected()行为?提前谢谢。

以下是实现它的方法:

class ServiceWrapper {
    @Volatile
    private var deferredUntilConnected = CompletableDeferred<Unit>()
    private val service = Service(object : ConnectionCallback {
        override fun onConnected() {
            deferredUntilConnected.complete(Unit)
        }
        override fun onConnectionSuspended() {
            deferredUntilConnected = CompletableDeferred()
        }
    })
    private suspend fun suspendUntilConnected() = deferredUntilConnected.await()
    ...
}

一般说明:仅仅因为服务在某个点已连接并不能保证在您使用它时它仍然会连接。

另一种StateFlow方法

  1. 首先定义一些"状态"
enum class ServiceState {
    CONNECTED, SUSPENDED, FAILED
}
  1. 将基于回调的代码转换为流
val connectionState = MutableStateFlow(ServiceState.FAILED)
private val service = Service(object : ConnectionCallback {
    override fun onConnected() {
        connectionState.value = ServiceState.CONNECTED
    }
    override fun onConnectionSuspended() {
        connectionState.value = ServiceState.SUSPENDED
    }
    override fun onConnectionFailed() {
        connectionState.value = ServiceState.FAILED
    }
})
  1. 复制并粘贴这些实用程序代码
class ConditionalAwait<T>(
    private val stateFlow: StateFlow<T>,
    private val condition: (T) -> Boolean
) {
    suspend fun await(): T {
        val nowValue = stateFlow.value
        return if (condition(nowValue)) {
            nowValue
        } else {
            stateFlow.first { condition(it) }
        }
    }
}
suspend fun <T> StateFlow<T>.conditionalAwait(condition: (T) -> Boolean): T =
    ConditionalAwait(this, condition).await()
  1. 用吧~
suspend fun performActionWhenConnected() {
    connectionState.conditionalAwait { it == ServiceState.CONNECTED }
    // other actions when service is connected
}
<小时 />
  1. 高级用法
suspend fun performActionWhenConnected() {
    val state = connectionState.conditionalAwait { 
        it == ServiceState.CONNECTED || it == ServiceState.FAILED
    } // keep suspended when Service.SUSPENDED
    if (state is ServiceState.CONNECTED) {
        // other actions when service is connected
    } else {
        // error handling
    }
}
  1. suspendCoroutine 会挂起当前正在运行的协程,因此无需预挂起。

获取挂起函数内的当前继续实例,并挂起当前正在运行的协程。

  1. 另一方面,我建议使用 async 并返回 Deferred .暂停并不是ServiceWrapper的工作。调用方应控制何时需要结果,从而控制何时在需要时挂起。
class ServiceWrapper {
    ...
    fun performAction(actionName: String): Deferred<ActionResult> = coroutineScope.async {
        suspendCoroutine { continuation ->
            service.performAction(actionName, object : ActionCallback() {
                override fun onSuccess(result: ActionResult) {
                    continuation.resume(result)
                }
                override fun onError() {
                    continuation.resumeWithException(RuntimeException())
                }
            }
        }
    }
}
  1. 如果由于某种原因,您必须在suspendCoroutine之前暂停,我建议使用CompletableJobDeferred应该返回一个结果。从我的角度来看,使用Deferred<Unit>似乎是一种反模式。

你处于一个挂起的函数中,为什么不这样:

while (!service.isConnected()) {
    delay(1000)
}

您可以在此语句中加入其他超时条件。

相关内容

  • 没有找到相关文章

最新更新