如何改进这段代码并将其重写为 kotlin 协程



我正在尝试实现功能:我有一个 rest 端点,它调用执行可能需要大量时间的代码。我现在改善体验的想法是将该段代码包装为新线程,等待完成或经过一些最大时间并返回适当的消息。即使端点已经发回消息,包装的代码也应该完成。当前实现如下所示:

private const val N = 1000
private const val MAX_WAIT_TIME = 5000
@RestController
@RequestMapping("/long")
class SomeController(
val service: SomeService,
) {
private val executor = Executors.newFixedThreadPool(N)
@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
val submit = executor.submit {
service.longExecution(someParam)
}
val start = System.currentTimeMillis()
while (System.currentTimeMillis() - start < MAX_WAIT_TIME) {
if (submit.isDone)
return ResponseEntity.ok("Done")
}
return ResponseEntity.ok("Check later")
}
}

第一个问题是 - 等待时间似乎不对,我们不发布线程,可以改进吗?

更重要的问题 - 如何将其重写为 Kotlin 协程? 我的尝试很简单,任务完成后没有立即返回,看起来像这样:

@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> = runBlocking {
val result = async {
withContext(Dispatchers.Default) {
service.longExecution(someParam)
}
}
delay(MAX_WAIT_TIME)
return@runBlocking ResponseEntity.ok(if(result.isCompleted) "Done" else "Check later")
}

但即使返回正确的字符串,在完成longExecution之前也不会发送答案。如何解决这个问题,我错过了什么?也许协程在这里是糟糕的应用程序?

您当前的协程尝试存在几个问题:

  1. 您将在runBlocking的作用域内启动async计算,因此整个端点方法将等待子协程完成,尽管您在此之前尝试return-Ing。
  2. delay()将始终等待MAX_WAIT_TIME即使任务完成得比这更快
  3. (可选)如果你的框架支持异步控制器方法,你根本不需要使用runBlocking(Spring WebFlux 确实支持控制器中的suspend函数)

对于第一个问题,请记住,每次启动应该比函数寿命更长的协程时,都必须使用外部作用域。coroutineScoperunBlocking不适合这些情况,因为它们将等待子协程完成。

可以使用CoroutineScope()工厂函数创建作用域,但需要考虑协程的生存期以及何时取消协程。如果longExecution函数有一个错误并且永远挂起,你不想泄露调用它的协程并破坏你的内存,所以你应该以某种方式取消这些协程。这就是为什么你应该将作用域存储为类中的变量,并在适当的时候取消它(当你想放弃这些操作时)。

对于第二个问题,使用withTimeout非常普遍,但它不适合您的用例,因为您希望任务即使在等待超时后也能继续执行。一种可能的解决方案是使用 select 子句等待作业完成,或等待某个指定的最长时间:

// TODO call scope.cancel() somewhere appropriate (when this component is not needed anymore)
val scope = CoroutineScope(Job())
@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
val job = scope.launch {
longExecution()
}

val resultText = runBlocking {
select {
job.onJoin() { "Done" }
onTimeout(MAX_WAIT_TIME) { "Check later" } 
}
}
return ResponseEntity.ok(resultText)
}

注意:我使用的是launch而不是async,因为您似乎不需要这里的longExecution返回值。


如果你也想解决问题#3,你可以简单地声明你的处理程序suspend并删除select周围的runBlocking

// TODO call scope.cancel() somewhere appropriate (when this component is not needed anymore)
val scope = CoroutineScope(Job())
@PostMapping
suspend fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
val job = scope.launch {
longExecution()
}

val resultText = select {
job.onJoin() { "Done" }
onTimeout(MAX_WAIT_TIME) { "Check later" }
}
return ResponseEntity.ok(resultText)
}

请注意,这需要spring-boot-starter-webflux而不是spring-boot-starter-web

您的实现始终等待MAX_WAIT_TIME。这可能有效:

@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> = runBlocking {
try {
withTimeout(MAX_WAIT_TIME) {
async {
withContext(Dispatchers.Default) {
service.longExecution(someParam)
}
}
}
} catch (ex: CancellationException) {
return@runBlocking ResponseEntity.ok("Check later")
}
return@runBlocking ResponseEntity.ok("Done")
}

虽然我不确定是否会有任何不必要的副作用,因为这似乎会在协程达到MAX_WAIT_TIME时取消它。在此处阅读更多相关信息:
取消和超时

最新更新