如果另一个未来操作失败,如何取消该操作



我有两个期货(数据库表上的两个操作),我希望在保存修改之前检查这两个期货是否都成功完成。

现在,我在第一个中开始第二个未来(作为依赖),但我知道这不是最好的选择。我知道我可以使用for-理解来并行执行两个期货,但即使其中一个失败,另一个也会被执行(尚未测试)

firstFuture.dropColumn(tableName) match {
case Success(_) => secondFuture.deleteEntity(entity)
case Failure(e) => throw new Exception(e.getMessage)
}
// the  first future alters a table, drops a column
// the second future deletes a row from another table

在这种情况下,如果第一个future成功执行,则第二个future可能会失败。我想恢复第一个未来的更新。我听说过SQL事务,似乎是这样的,但怎么回事?

val futuresResult = for {
first <- firstFuture.dropColumn(tableName)
second <- secondFuture.deleteEntity(entity)
} yield (first, second)

在我的情况下,for-理解要好得多,因为我在这两个未来之间没有依赖关系,可以并行执行,但这并不能解决我的问题,结果可以是(成功,成功)或(失败,成功)。

关于顺序运行与并行运行的Future

这有点棘手,因为ScalaFuture被设计为热切。在各种Scala库中,还有一些其他结构可以处理同步和异步效果,例如catsIO、MonixTaskZIO等,它们是以lazy的方式设计的,但它们没有这种行为。

Future渴望的是它会尽快开始计算。这里的"启动"是指将其安排在显式选择或隐式显示的ExecutionContext上。虽然从技术上讲,在调度器决定这样做的情况下,执行可能会暂停一点,但它很可能会几乎立即启动。

因此,如果您有一个类型为Future的值,它将立即开始运行。如果您有一个Future类型的惰性值,或者一个返回Future类型值的函数/方法,那么它不是。

但是,即使你所拥有的都是简单的值(没有懒惰的vals或defs),如果Future定义是在中完成的,以便于理解,那么这意味着它是一个单元flatMap链的一部分(如果你不理解,现在忽略它),并且它将按顺序运行,而不是并行运行。为什么?这并不特定于Future;每一个for理解都有一个顺序链的语义,在这个顺序链中,您可以将上一步的结果传递给下一步。因此,如果某个程序依赖于步骤n中的某个程序,则不能在步骤n+1运行该程序是合乎逻辑的。

这里有一些代码来演示这一点。

val program = for {
_ <- Future { Thread.sleep(5000); println("f1") }
_ <- Future { Thread.sleep(5000); println("f2") }
} yield ()
Await.result(program, Duration.Inf)

此程序将等待5秒钟,然后打印"f1",再等待5秒钟后再打印"f2"。

现在让我们来看看这个:

val f1 = Future { Thread.sleep(5000); println("f1") }
val f2 = Future { Thread.sleep(5000); println("f2") }
val program = for {
_ <- f1
_ <- f2
} yield ()
Await.result(program, Duration.Inf)

然而,该程序将在五秒钟后同时打印"f1"one_answers"f2"。

请注意,在第二种情况下并没有真正违反序列语义。CCD_ 15仍然有机会使用CCD_ 16的结果。但CCD_ 17没有使用CCD_ 18的结果;它是一个可以立即计算的独立值(用val定义)。因此,如果我们将val f2更改为一个函数,例如def f2(number: Int),则执行会更改:

val f1 = Future { Thread.sleep(5000); println("f1"); 42 }
def f2(number: Int) = Future { Thread.sleep(5000); println(number) }
val program = for {
number <- f1
_ <- f2(number)
} yield ()

正如您所料,这将在五秒钟后打印"f1",只有到那时,另一个Future才会启动,因此它将在五秒后打印"42"。

关于交易:

正如@cbley在评论中提到的,这听起来像是你想要数据库事务。例如,在SQL数据库中,这有一个非常特殊的含义,它确保了ACID属性。

如果这正是您所需要的,那么您需要在数据库层上解决它。CCD_ 23太通用了;它只是一种模拟同步和异步计算的效果类型。当您看到Future值时,仅通过查看类型,您无法判断它是数据库调用的结果,还是某种HTTP调用的结果。

例如,doobie将每个数据库查询描述为ConnectionIO类型。您可以在中排列多个查询以进行理解,就像使用Future:一样

val program = for {
a <- database.getA()
_ <- database.write("foo")
b <- database.getB()
} yield {
// use a and b
}

但与前面的示例不同,这里的getA()getB()不返回Future[A]类型的值,而是返回ConnectionIO[A]类型的值。有趣的是,doobie完全考虑到了这样一个事实,即您可能希望这些查询在单个事务中运行,因此如果getB()失败,"foo"将不会提交到数据库。

因此,在这种情况下,您要做的是获得查询集的完整描述,将其封装为类型为ConnectionIO的单个值program,一旦您想实际运行事务,您就可以执行类似program.transact(myTransactor)的操作,其中myTransactorTransactor的实例,这是一个知道如何连接到物理数据库的doobie构造。

一旦你进行了交易,你的ConnectionIO[A]就会变成Future[A]。如果事务失败,则会有一个失败的Future,并且不会有任何内容真正提交到数据库中。

如果您的数据库操作彼此独立,并且可以并行运行,doobie也会允许您这样做。文档中很好地解释了通过doobie提交事务的顺序和并行方式。

最新更新