我使用以下代码在blocking
块内获得JDBC连接,并将该连接传递给fn: Connection => Future[_]
。fn完成后,我想提交/回滚事务并关闭连接。
def withTransactionAsync[T](fn: Connection => Future[T]): Future[T] =
Future {
blocking {
ds.getConnection
}
}.flatMap { conn =>
fn(conn)
.map { r => conn.commit(); conn.close(); r }
.recoverWith {
case e: Throwable =>
conn.rollback()
conn.close()
throw e
}
}
我使用的是一个基于ForkJoinPool
的独立执行上下文。
如果有足够的调用,此代码将进入死锁。凭直觉,这是有道理的。使用getConnection
调用的第一个future在等待可用连接时被阻塞,而可用连接正在等待ExecutionContext中的可用线程运行commit(); close()
块以释放连接,并释放执行上下文中的线程以供getConnection
运行。我用线程转储验证了这一点。
我发现解决这个问题的唯一方法是在同一个Future {}
上运行所有内容,从而避免切换上下文:
def withTransactionAsync[T](fn: Connection => Future[T]): Future[T] =
Future {
blocking {
val conn = ds.getConnection
try {
conn.setAutoCommit(false)
val r = Await.result(fn(conn), Duration.Inf)
conn.commit()
r
} catch {
case e: Throwable =>
conn.rollback()
throw e
} finally
conn.close()
}
}
但这样我就屏蔽了Await.result
。我想这不是一个大问题,因为我在blocking
块内进行阻塞,但我担心这会产生不可预见的后果,而且不一定是这个API的调用方所期望的。
有没有一种方法可以在不使用Await
的情况下绕过这种僵局,只依靠Future组合?
我想可以这样做:这个函数不接受Connection => Future[T]
,而只接受Connection => T
,但我想保留那个API。
如果我将ForkJoinPool
的大小增加到足够大,它是有效的,但对于所有工作负载来说,这个大小很难计算/预测,而且我不希望ForkJoinPool
的大小是我的数据库池的许多倍。
如注释中所述,fn
是阻塞代码。但是它不在blocking
子句中,所以它将绑定池中的一个主线程。如果这种情况发生足够多次,则池将耗尽线程,系统将死锁。
因此,对fn
的调用和随后的代码需要位于blocking
子句内部,以便为其创建一个单独的线程,并且主线程仍然可用于非阻塞代码。
考虑到阻塞代码的数量,可能值得研究一个Task
模型,该模型每个连接有一个线程,而不是每个挂起操作有一个螺纹,这样线程的数量就会受到限制。这基本上是为了解决getConnection
是同步的这一事实,这是HikariCP的一个问题。