引发异常或使用Future.failed使任务失败



我正在使用scala期货来轮询API状态。继续轮询,直到返回成功或失败。然而,当任何批处理失败时,它都应该抛出错误并停止程序。

我不能通过throw(exception(、Future.success(false(或Future.failed(new exception("error"((抛出错误。


package FutureScheduler
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global

object delayTest extends App {
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

def getStatus(batchId: String): String = {
print(s" $batchId ${System.currentTimeMillis()} continue n")
batchId match {
case "batch1" => "success"
case "batch2" => "failed"
case "batch2" => "success" 
}
}

def waitTask(batch: (String, Int)
): Future[Boolean] =
Delayed(x.seconds)(getStatus(batch._1)).flatMap {
case "success" =>
print(s"n${batch._1} succeeded for ${batch._2}")
Future.successful(true)
case "failed"  =>
print(s"n${batch._1} failed for ${batch._2}")
Future.failed(new Exception("error"))
throw new RuntimeException("errored")
case _ => {
waitTask(batch)
}
}

val statusList =  List(Some("batch1", 123), None, Some("batch2", 124)).flatten
val y = 1
val x = 5
try {
Await.ready(Future.traverse(statusList)((waitTask _)), y.minutes)
}
catch {
case e: Exception => println("caught error")
}
print("nbye now")
}
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
object Delayed {
import java.util.{Timer, TimerTask}
private val timer = new Timer
def apply[T](delay: Duration)(task: => T): Future[T] = {
val promise = Promise[T]()
val tt = new TimerTask {
override def run(): Unit = promise.success(task)
}
timer.schedule(tt, delay.toMillis)
promise.future
}
}

throw发生在Delayed返回的Future内部,因此它将被该Future捕获。

您需要将Await.ready转换为Await.result,然后查看它返回的值以获得测试结果。

相关内容

最新更新