Lagom主题订阅者-如何在未来异常中重试



我在lagom中有一个主题订阅者,如下所示

fooService.fooTopic().subscribe
.atLeastOnce(
Flow[fooMsg].map {
case fooMsg(_) =>
foo()
case a =>
println(a)
}.async.map{ _ =>
Done
}
)

要订阅此主题,我使用atLeastOnce作为方法,因此如果出现任何异常,我希望重新启动/重试流。当我抛出一个正常的异常时,它可以继续正常重试

private def foo() = {
throw new RuntimeException("testing error")
}

但当将来发生异常时,无论我如何尝试,Flow都不会重新启动。这是我在未来中处理异常的一次尝试

private def foo() = {
val test: Future[Int] = Future(throw new RuntimeException("asd"))
val result = for {
y1 <- test
} yield (y1)
result.onComplete{
case Success(value) => println("SUCCESS")
case Failure(exception) => println(exception.getMessage)
throw exception
}
}
private def foo() = {
val test: Future[Int] = Future(throw new RuntimeException("asd"))
test.onComplete{
case Success(value) => println("SUCCESS")
case Failure(exception) => println(exception.getMessage)
throw exception
}
}

它将显示一个异常,但Flow不会自动重新启动它。我应该如何处理/抛出Future中的异常?

我认为,如果您将来只失败过一次,就不需要重新启动整个流程。我建议只重试Future。例如,你可以写这样的代码,它将重试你的调用,在你的方法的调用上替换Future.success(10(:

val test: Future[Int] = Future(throw new RuntimeException("asd")).recoverWith {
case NonFatal(e) =>
Future.successful(10)
}
val result = for {
y1 <- test
} yield (y1)

此外,您可以随心所欲地编写代码,它将失败并重试,但您需要返回Future:的结果

kafka.topic1.subscribe.atLeastOnce(Flow[String]
.mapAsync(1) {
case envelope: String =>
val test: Future[String] = Future(throw new RuntimeException("asd"))
/*.recoverWith {
case NonFatal(e) =>
Future.successful("10")
}*/
val result = for {
y1 <- test
} yield (y1)
println(s"code block $envelope")
result.onComplete{
case Success(value) => println(s"Message from topic: $envelope $result")
case Failure(exception) => println(exception.getMessage)
throw exception
}
result.map(_ => Done)
}
)

相关内容

  • 没有找到相关文章

最新更新