我正在使用scala期货来轮询API状态。继续轮询,直到返回成功或失败。然而,当任何批处理失败时,它都应该抛出错误并停止程序。
我不能通过throw(exception(、Future.success(false(或Future.failed(new exception("error"((抛出错误。
package FutureScheduler
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
object delayTest extends App {
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
def getStatus(batchId: String): String = {
print(s" $batchId ${System.currentTimeMillis()} continue n")
batchId match {
case "batch1" => "success"
case "batch2" => "failed"
case "batch2" => "success"
}
}
def waitTask(batch: (String, Int)
): Future[Boolean] =
Delayed(x.seconds)(getStatus(batch._1)).flatMap {
case "success" =>
print(s"n${batch._1} succeeded for ${batch._2}")
Future.successful(true)
case "failed" =>
print(s"n${batch._1} failed for ${batch._2}")
Future.failed(new Exception("error"))
throw new RuntimeException("errored")
case _ => {
waitTask(batch)
}
}
val statusList = List(Some("batch1", 123), None, Some("batch2", 124)).flatten
val y = 1
val x = 5
try {
Await.ready(Future.traverse(statusList)((waitTask _)), y.minutes)
}
catch {
case e: Exception => println("caught error")
}
print("nbye now")
}
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
object Delayed {
import java.util.{Timer, TimerTask}
private val timer = new Timer
def apply[T](delay: Duration)(task: => T): Future[T] = {
val promise = Promise[T]()
val tt = new TimerTask {
override def run(): Unit = promise.success(task)
}
timer.schedule(tt, delay.toMillis)
promise.future
}
}
throw
发生在Delayed
返回的Future
内部,因此它将被该Future
捕获。
您需要将Await.ready
转换为Await.result
,然后查看它返回的值以获得测试结果。