Kotlin 协程未来等待超时(无取消)



鉴于我们有一个CompletableFuture f,在 kotlin 可暂停的范围内,我们可以调用f.await(),我们将暂停直到它完成。

我在实现带有签名f.await(t)的类似函数时遇到问题,该函数必须暂停最多 t 毫秒,或者如果将来在该持续时间内完成(以先发生者为准),则更快返回。

这是我尝试过的。

/**
 * Suspend current method until future is done or specified duration expires,
 * whichever happens first without cancelling the future.
 * Returns true if its done, false otherwise.
 */
suspend fun <T> ListenableFuture<T>.await(duration: Long): Boolean {
   val future = this
   try {
      withTimeout(duration) {
         withContext(NonCancellable) { // this does not help either
            future.await() // i do not expect the future itself to be cancelled
         }
      }
   } catch (t: TimeoutCancellationException) {
      // we expected this
   } catch (e: Throwable) {
      e.printStackTrace()
   }
   return future.isDone
}
fun main(args: Array<String>) = runBlocking<Unit> {
   val future = GlobalScope.future {
      try {
         repeat(5) {
            println("computing")
            delay(500)
         }
         println("complete")
      } finally {
         withContext(NonCancellable) {
            println("cancelling")
            delay(500)
            println("cancelled")
         }
      }
   }
   for (i in 0..10) {
      if (future.await(2000)) {
         println("checking : done")
      } else {
         println("checking : not done")
      }
   }
}

我还需要一个类似的工作功能。但也许对此的解决方案也会帮助我......

为此输出为

computing
computing
computing
computing
checking : done
checking : done
checking : done
checking : done
cancelling
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done

我写了一些测试代码:

fun main(args: Array<String>) = runBlocking {
    val future = calculateAsync()
    val result = future.await(2000)
    println("result=$result")
}
suspend fun <T> CompletableFuture<T>.await(duration: Long): T? {
    val future = this
    var result: T? = null
    try {
        withTimeout(duration) {
            result = future.await()
        }
    } catch (t: TimeoutCancellationException) {
        println("timeout exception")
    } catch (e: Throwable) {
        e.printStackTrace()
    }
    return result
}
@Throws(InterruptedException::class)
fun calculateAsync(): CompletableFuture<String> {
    val completableFuture = CompletableFuture<String>()
    Executors.newCachedThreadPool().submit {
        Thread.sleep(3000)
        println("after sleep")
        completableFuture.complete("Completed")
    }
    return completableFuture
}

运行此代码后,我们将得到一个输出:

timeout exception
result=null
after sleep

我们看到扩展函数await返回null,因为我们将超时设置为 2000 毫秒,但CompletableFuture在 3000 毫秒后完成。在这种情况下,CompletableFuture被取消(其isCancelled属性返回true),但我们在函数calculateAsync运行的线程继续执行(我们在日志after sleep中看到它)。

如果我们在main函数中future.await(4000)将超时持续时间设置为 4000 毫秒,我们将看到下一个输出:

after sleep
result=Completed

现在我们得到了一些结果,因为CompletableFuture的执行速度超过了 4000 毫秒。

这是我想出的,我相信这不是一个好的解决方案,因为我很可能为相当原始的任务创造了很多垃圾。


suspend fun <T> CompletableFuture<T>.await(duration: Millis): Boolean {
   val timeout = CompletableFuture<Unit>()
   GlobalScope.launch {
      delay(duration)
      timeout.complete(Unit)
   }
   val anyOfTwo = CompletableFuture.anyOf(this, timeout)
   anyOfTwo.await()
   return this.isDone
}

fun main() = runBlocking {
   val future = CompletableFuture<String>()
   GlobalScope.launch {
      delay(2000)
      println("setting the result (future now ${future.isDone})")
      future.complete("something")
   }
   while (future.isNotDone()) {
      println("waiting for the future to complete for the next 500ms")
      val isDone = future.await(500)
      if (isDone) {
         println("future is done")
         break
      } else {
         println("future not done")
      }
   }
   Unit
}

这将给出

waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
setting the result (future now false)
future is done

这就是我们想要的...

最新更新