我有两个期货(数据库表上的两个操作),我希望在保存修改之前检查这两个期货是否都成功完成。
现在,我在第一个中开始第二个未来(作为依赖),但我知道这不是最好的选择。我知道我可以使用for
-理解来并行执行两个期货,但即使其中一个失败,另一个也会被执行(尚未测试)
firstFuture.dropColumn(tableName) match {
case Success(_) => secondFuture.deleteEntity(entity)
case Failure(e) => throw new Exception(e.getMessage)
}
// the first future alters a table, drops a column
// the second future deletes a row from another table
在这种情况下,如果第一个future成功执行,则第二个future可能会失败。我想恢复第一个未来的更新。我听说过SQL事务,似乎是这样的,但怎么回事?
val futuresResult = for {
first <- firstFuture.dropColumn(tableName)
second <- secondFuture.deleteEntity(entity)
} yield (first, second)
在我的情况下,for
-理解要好得多,因为我在这两个未来之间没有依赖关系,可以并行执行,但这并不能解决我的问题,结果可以是(成功,成功)或(失败,成功)。
关于顺序运行与并行运行的Future
:
这有点棘手,因为ScalaFuture
被设计为热切。在各种Scala库中,还有一些其他结构可以处理同步和异步效果,例如catsIO
、MonixTask
、ZIO
等,它们是以lazy的方式设计的,但它们没有这种行为。
Future
渴望的是它会尽快开始计算。这里的"启动"是指将其安排在显式选择或隐式显示的ExecutionContext
上。虽然从技术上讲,在调度器决定这样做的情况下,执行可能会暂停一点,但它很可能会几乎立即启动。
因此,如果您有一个类型为Future
的值,它将立即开始运行。如果您有一个Future
类型的惰性值,或者一个返回Future
类型值的函数/方法,那么它不是。
但是,即使你所拥有的都是简单的值(没有懒惰的vals或defs),如果Future
定义是在中完成的,以便于理解,那么这意味着它是一个单元flatMap链的一部分(如果你不理解,现在忽略它),并且它将按顺序运行,而不是并行运行。为什么?这并不特定于Future
;每一个for理解都有一个顺序链的语义,在这个顺序链中,您可以将上一步的结果传递给下一步。因此,如果某个程序依赖于步骤n中的某个程序,则不能在步骤n+1运行该程序是合乎逻辑的。
这里有一些代码来演示这一点。
val program = for {
_ <- Future { Thread.sleep(5000); println("f1") }
_ <- Future { Thread.sleep(5000); println("f2") }
} yield ()
Await.result(program, Duration.Inf)
此程序将等待5秒钟,然后打印"f1",再等待5秒钟后再打印"f2"。
现在让我们来看看这个:
val f1 = Future { Thread.sleep(5000); println("f1") }
val f2 = Future { Thread.sleep(5000); println("f2") }
val program = for {
_ <- f1
_ <- f2
} yield ()
Await.result(program, Duration.Inf)
然而,该程序将在五秒钟后同时打印"f1"one_answers"f2"。
请注意,在第二种情况下并没有真正违反序列语义。CCD_ 15仍然有机会使用CCD_ 16的结果。但CCD_ 17没有使用CCD_ 18的结果;它是一个可以立即计算的独立值(用val
定义)。因此,如果我们将val f2
更改为一个函数,例如def f2(number: Int)
,则执行会更改:
val f1 = Future { Thread.sleep(5000); println("f1"); 42 }
def f2(number: Int) = Future { Thread.sleep(5000); println(number) }
val program = for {
number <- f1
_ <- f2(number)
} yield ()
正如您所料,这将在五秒钟后打印"f1",只有到那时,另一个Future
才会启动,因此它将在五秒后打印"42"。
关于交易:
正如@cbley在评论中提到的,这听起来像是你想要数据库事务。例如,在SQL数据库中,这有一个非常特殊的含义,它确保了ACID属性。
如果这正是您所需要的,那么您需要在数据库层上解决它。CCD_ 23太通用了;它只是一种模拟同步和异步计算的效果类型。当您看到Future
值时,仅通过查看类型,您无法判断它是数据库调用的结果,还是某种HTTP调用的结果。
例如,doobie将每个数据库查询描述为ConnectionIO
类型。您可以在中排列多个查询以进行理解,就像使用Future
:一样
val program = for {
a <- database.getA()
_ <- database.write("foo")
b <- database.getB()
} yield {
// use a and b
}
但与前面的示例不同,这里的getA()
和getB()
不返回Future[A]
类型的值,而是返回ConnectionIO[A]
类型的值。有趣的是,doobie完全考虑到了这样一个事实,即您可能希望这些查询在单个事务中运行,因此如果getB()
失败,"foo"将不会提交到数据库。
因此,在这种情况下,您要做的是获得查询集的完整描述,将其封装为类型为ConnectionIO
的单个值program
,一旦您想实际运行事务,您就可以执行类似program.transact(myTransactor)
的操作,其中myTransactor
是Transactor
的实例,这是一个知道如何连接到物理数据库的doobie构造。
一旦你进行了交易,你的ConnectionIO[A]
就会变成Future[A]
。如果事务失败,则会有一个失败的Future
,并且不会有任何内容真正提交到数据库中。
如果您的数据库操作彼此独立,并且可以并行运行,doobie也会允许您这样做。文档中很好地解释了通过doobie提交事务的顺序和并行方式。