如何在 kotlin 协程上重试指数退避



我正在使用 kotlin 协程进行网络请求,使用扩展方法在这样的改造中调用类

public suspend fun <T : Any> Call<T>.await(): T {
return suspendCancellableCoroutine { continuation -> 
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>?, response: Response<T?>) {
if (response.isSuccessful) {
val body = response.body()
if (body == null) {
continuation.resumeWithException(
NullPointerException("Response body is null")
)
} else {
continuation.resume(body)
}
} else {
continuation.resumeWithException(HttpException(response))
}
}
override fun onFailure(call: Call<T>, t: Throwable) {
// Don't bother with resuming the continuation if it is already cancelled.
if (continuation.isCancelled) return
continuation.resumeWithException(t)
}
})
registerOnCompletion(continuation)
}
}

然后从呼叫端我正在使用上面的方法,如下所示

private fun getArticles()  = launch(UI) {
loading.value = true
try {
val networkResult = api.getArticle().await()
articles.value =  networkResult
}catch (e: Throwable){
e.printStackTrace()
message.value = e.message
}finally {
loading.value = false
}
}

我想在某些情况下以指数方式重试此 API 调用,即(IOException)我如何实现它?

我建议为您的重试逻辑编写一个辅助程序高阶函数。您可以使用以下实现作为开始:

suspend fun <T> retryIO(
times: Int = Int.MAX_VALUE,
initialDelay: Long = 100, // 0.1 second
maxDelay: Long = 1000,    // 1 second
factor: Double = 2.0,
block: suspend () -> T): T
{
var currentDelay = initialDelay
repeat(times - 1) {
try {
return block()
} catch (e: IOException) {
// you can log an error here and/or make a more finer-grained
// analysis of the cause to see if retry is needed
}
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
return block() // last attempt
}

使用此函数非常直接:

val networkResult = retryIO { api.getArticle().await() }

您可以根据具体情况更改重试参数,例如:

val networkResult = retryIO(times = 3) { api.doSomething().await() }

您还可以完全更改retryIO的实现以满足应用程序的需求。例如,您可以对所有重试参数进行硬编码,摆脱重试次数限制,更改默认值等。

这里有一个带有FlowretryWhen函数的示例

RetryWhen扩展名 :

fun <T> Flow<T>.retryWhen(
@FloatRange(from = 0.0) initialDelay: Float = RETRY_INITIAL_DELAY,
@FloatRange(from = 1.0) retryFactor: Float = RETRY_FACTOR_DELAY,
predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long, delay: Long) -> Boolean
): Flow<T> = this.retryWhen { cause, attempt ->
val retryDelay = initialDelay * retryFactor.pow(attempt.toFloat())
predicate(cause, attempt, retryDelay.toLong())
}

用法:

flow {
...
}.retryWhen { cause, attempt, delay ->
delay(delay)
...
}

这是我之前回答的一个更复杂和方便的版本,希望对某人有所帮助:

class RetryOperation internal constructor(
private val retries: Int,
private val initialIntervalMilli: Long = 1000,
private val retryStrategy: RetryStrategy = RetryStrategy.LINEAR,
private val retry: suspend RetryOperation.() -> Unit
) {
var tryNumber: Int = 0
internal set
suspend fun operationFailed() {
tryNumber++
if (tryNumber < retries) {
delay(calculateDelay(tryNumber, initialIntervalMilli, retryStrategy))
retry.invoke(this)
}
}
}
enum class RetryStrategy {
CONSTANT, LINEAR, EXPONENTIAL
}
suspend fun retryOperation(
retries: Int = 100,
initialDelay: Long = 0,
initialIntervalMilli: Long = 1000,
retryStrategy: RetryStrategy = RetryStrategy.LINEAR,
operation: suspend RetryOperation.() -> Unit
) {
val retryOperation = RetryOperation(
retries,
initialIntervalMilli,
retryStrategy,
operation,
)
delay(initialDelay)
operation.invoke(retryOperation)
}
internal fun calculateDelay(tryNumber: Int, initialIntervalMilli: Long, retryStrategy: RetryStrategy): Long {
return when (retryStrategy) {
RetryStrategy.CONSTANT -> initialIntervalMilli
RetryStrategy.LINEAR -> initialIntervalMilli * tryNumber
RetryStrategy.EXPONENTIAL -> 2.0.pow(tryNumber).toLong()
}
}

用法:

coroutineScope.launch {
retryOperation(3) {
if (!tryStuff()) {
Log.d(TAG, "Try number $tryNumber")
operationFailed()
}
}
}

Flow Version https://github.com/hoc081098/FlowExt

package com.hoc081098.flowext
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.retryWhen
@ExperimentalTime
public fun <T> Flow<T>.retryWithExponentialBackoff(
initialDelay: Duration,
factor: Double,
maxAttempt: Long = Long.MAX_VALUE,
maxDelay: Duration = Duration.INFINITE,
predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
require(maxAttempt > 0) { "Expected positive amount of maxAttempt, but had $maxAttempt" }
return retryWhenWithExponentialBackoff(
initialDelay = initialDelay,
factor = factor,
maxDelay = maxDelay
) { cause, attempt -> attempt < maxAttempt && predicate(cause) }
}
@ExperimentalTime
public fun <T> Flow<T>.retryWhenWithExponentialBackoff(
initialDelay: Duration,
factor: Double,
maxDelay: Duration = Duration.INFINITE,
predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T> = flow {
var currentDelay = initialDelay
retryWhen { cause, attempt ->
predicate(cause, attempt).also {
if (it) {
delay(currentDelay)
currentDelay = (currentDelay * factor).coerceAtMost(maxDelay)
}
}
}.let { emitAll(it) }
}

@Roman Elizarov 的答案的改进版本,使用Iterator/Sequence的回退策略:

private suspend fun <T> retry(
times: Int = 3,              // retry three times
backoffStrategy: Iterator<Long>,
predicate: (R) -> Boolean = { false },
block: suspend (attempt: Int) -> T): T
{
repeat(times - 1) { attempt ->
val result = block(attempt + 1)
if (predicate(result)) {
delay(backoffStrategy.next())
} else {
return result
}
}
return block(times) // last attempt
}

使用Iterator将重试逻辑与退避策略分开,退避策略可以像以下那样简单:

// generates 1000, 1000, 1000 etc.
val linearBackoff = generateSequence(1000) { it }.iterator()

或更复杂的:

val exponentialBackoff = backoffStrategy()
val constantBackoff = backoffStrategy(factor = 1.0)
fun backoffStrategy(
initialDelay: Long = 1000,   // 1 second
maxDelay: Long = 20000,      // 10 second
factor: Double = 2.0,        // exponential backoff base 2
) = generateSequence(initialDelay) { previous ->
previous.times(factor).toLong().coerceAtMost(maxDelay)
}.iterator()

注意:要执行的代码(block)负责处理异常。我通常做面向铁路的编程T所以有点像Either<Error, T>Result<T>

你可以尝试这种简单但非常敏捷的方法,用法简单:

编辑:在单独的答案中添加了更复杂的解决方案。

class Completion(private val retry: (Completion) -> Unit) {
fun operationFailed() {
retry.invoke(this)
}
}
fun retryOperation(retries: Int, 
dispatcher: CoroutineDispatcher = Dispatchers.Default, 
operation: Completion.() -> Unit
) {
var tryNumber = 0
val completion = Completion {
tryNumber++
if (tryNumber < retries) {
GlobalScope.launch(dispatcher) {
delay(TimeUnit.SECONDS.toMillis(tryNumber.toLong()))
operation.invoke(it)
}
}
}
operation.invoke(completion)
}

像这样使用它:

retryOperation(3) {
if (!tryStuff()) {
// this will trigger a retry after tryNumber seconds
operationFailed()
}
}

你显然可以在它的基础上构建更多。

下面是一个 Kotlin 算法,它使用 retryWhen 重试网络请求,尝试之间的延迟增加:

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.retryWhen
import java.net.SocketTimeoutException
suspend fun <T> Flow<T>.retryWithBackoff(
maxAttempts: Int,
initialDelayMillis: Long,
maxDelayMillis: Long,
factor: Double,
predicate: (Throwable) -> Boolean = { it is SocketTimeoutException }
): Flow<T> = retryWhen { cause, attempt ->
if (attempt >= maxAttempts || !predicate(cause)) {
throw cause
}
delay(initialDelayMillis * factor.pow(attempt.toDouble()).toLong().coerceAtMost(maxDelayMillis))
}

此函数使用 retryWithBackoff 函数扩展 Flow 类,该函数接收一个整数 maxTry 值,该函数指定函数应尝试连接到服务器的最大次数,一个 initialDelayMillis long,该函数指定第一次尝试之前的初始延迟,一个 maxDelayMillis long,它指定尝试之间的最大延迟,一个因子双精度,该因子指定延迟在尝试之间应增加的因子, 以及一个可选的谓词函数,用于确定是否对给定异常重试。

该函数使用 retryWhen重试网络请求,当发生异常并且谓词返回 true 时,它会重新订阅流。重试之间的延迟随着每次尝试呈指数级增长,直至达到 maxDelayMillis 指定的最大延迟。

下面是此函数的示例用法:

val 流量 =//您的流量在这里

flow.retryWithBackoff(maxAttempts = 5, initialDelayMillis = 1000L, maxDelayMillis = 10000L, factor = 2.0)
.catch { e -> println("Failed after 5 attempts: ${e.message}") }
.collect { value -> println("Value: $value") }

此代码最多重试网络请求 5 次,初始延迟为 1000 毫秒,两次尝试之间的最大延迟为 10000 毫秒。尝试之间的延迟将以 2.0 的系数呈指数级增长。

由艾冰生成

相关内容

  • 没有找到相关文章

最新更新