Kotlin Flow<T> with Resilience4j RateLimiter and Retry



我有Resilience4j版本:1.7.1,Kotlin版本:1.7.0,KotlinCoroutines:1.6.1。

我想在kotlin代码中使用RateLimit和Retry,但文档中没有包含如何使用kotlin Flow的信息。

我有一个简单的代码:

suspend main() {
val rateLimiterConfig = RateLimiterConfig.custom()
.limitForPeriod(2)
.limitRefreshPeriod(Duration.ofSeconds(1))
.timeoutDuration(Duration.ofSeconds(2))
.build()
val rateLimiter = RateLimiter.of("rate-limiter", rateLimiterConfig)

val retryConfig = RetryConfig.custom<Any>()
.maxAttempts(3)
.retryExceptions(Exception::class.java)
.build()
val retry = Retry.of("retry", retryConfig)

coroutineScope {
flowOf(1,2,3,4,5,6,7,8)
.rateLimiter(rateLimiter)
.retry(retry)
.map { async { process(it) } }
.toList().awaitAll()
}
}
suspend fun process(num: Int): Int {
println("time: ${getTime()}, value: $num")
if(num >= 8) throw Exception()
delay(1000)
return num * num
}

我没有任何限制或重试。如果用打印时间(mm:SSS.SSS(和传入值运行此代码,我会得到以下信息:

time: 46:26.488,value: 7
time: 46:26.488,value: 4
time: 46:26.488,value: 3
time: 46:26.488,value: 1
time: 46:26.488,value: 6
time: 46:26.488,value: 5
time: 46:26.488,value: 8
time: 46:26.488,value: 2
Exception in thread "main" java.lang.Exception
at MainKt.process(Main.kt:165)
at MainKt$main$2$1$1.invokeSuspend(Main.kt:142)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:749)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)

它是如何工作的?

我想这就是你想要的:

coroutineScope {
flowOf(1,2,3,4,5,6,7,8)
.rateLimiter(rateLimiter)
.map { process(it) }
.retry(retry)
.toList()
}

1.重试

Resilience4j的retry在引擎盖下使用Flow.retryWhen。要使其工作,您必须在.map调用后使用它。此外,.retry操作员将重试整个,而不仅仅是失败的操作。

kotlinx.coroutines文档:

当上游流中发生异常并且谓词返回true时,重试给定流的集合。

此运算符对下游流中发生的异常是透明的,并且不会重试为取消流而引发的异常。

2.速率限制

使用async { }.awaitAll有点并行化了整个过程,所以rateLimiter无法完成它的工作。只需执行.map { process(it) }

最新更新