我有Resilience4j版本:1.7.1,Kotlin版本:1.7.0,KotlinCoroutines:1.6.1。
我想在kotlin代码中使用RateLimit和Retry,但文档中没有包含如何使用kotlin Flow的信息。
我有一个简单的代码:
suspend main() {
val rateLimiterConfig = RateLimiterConfig.custom()
.limitForPeriod(2)
.limitRefreshPeriod(Duration.ofSeconds(1))
.timeoutDuration(Duration.ofSeconds(2))
.build()
val rateLimiter = RateLimiter.of("rate-limiter", rateLimiterConfig)
val retryConfig = RetryConfig.custom<Any>()
.maxAttempts(3)
.retryExceptions(Exception::class.java)
.build()
val retry = Retry.of("retry", retryConfig)
coroutineScope {
flowOf(1,2,3,4,5,6,7,8)
.rateLimiter(rateLimiter)
.retry(retry)
.map { async { process(it) } }
.toList().awaitAll()
}
}
suspend fun process(num: Int): Int {
println("time: ${getTime()}, value: $num")
if(num >= 8) throw Exception()
delay(1000)
return num * num
}
我没有任何限制或重试。如果用打印时间(mm:SSS.SSS(和传入值运行此代码,我会得到以下信息:
time: 46:26.488,value: 7
time: 46:26.488,value: 4
time: 46:26.488,value: 3
time: 46:26.488,value: 1
time: 46:26.488,value: 6
time: 46:26.488,value: 5
time: 46:26.488,value: 8
time: 46:26.488,value: 2
Exception in thread "main" java.lang.Exception
at MainKt.process(Main.kt:165)
at MainKt$main$2$1$1.invokeSuspend(Main.kt:142)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:749)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
它是如何工作的?
我想这就是你想要的:
coroutineScope {
flowOf(1,2,3,4,5,6,7,8)
.rateLimiter(rateLimiter)
.map { process(it) }
.retry(retry)
.toList()
}
1.重试
Resilience4j的retry
在引擎盖下使用Flow.retryWhen
。要使其工作,您必须在.map
调用后使用它。此外,.retry
操作员将重试整个流,而不仅仅是失败的操作。
kotlinx.coroutines
文档:
当上游流中发生异常并且谓词返回true时,重试给定流的集合。
此运算符对下游流中发生的异常是透明的,并且不会重试为取消流而引发的异常。
2.速率限制
使用async { }
和.awaitAll
有点并行化了整个过程,所以rateLimiter
无法完成它的工作。只需执行.map { process(it) }
。