使用Future{blocking{}}避免死锁



我使用以下代码在blocking块内获得JDBC连接,并将该连接传递给fn: Connection => Future[_]。fn完成后,我想提交/回滚事务并关闭连接。

def withTransactionAsync[T](fn: Connection => Future[T]): Future[T] =
Future {
blocking {
ds.getConnection
}
}.flatMap { conn =>
fn(conn)
.map { r => conn.commit(); conn.close(); r }
.recoverWith {
case e: Throwable =>
conn.rollback()
conn.close()
throw e
}
}

我使用的是一个基于ForkJoinPool的独立执行上下文。

如果有足够的调用,此代码将进入死锁。凭直觉,这是有道理的。使用getConnection调用的第一个future在等待可用连接时被阻塞,而可用连接正在等待ExecutionContext中的可用线程运行commit(); close()块以释放连接,并释放执行上下文中的线程以供getConnection运行。我用线程转储验证了这一点。

我发现解决这个问题的唯一方法是在同一个Future {}上运行所有内容,从而避免切换上下文:

def withTransactionAsync[T](fn: Connection => Future[T]): Future[T] =
Future {
blocking {
val conn = ds.getConnection
try {
conn.setAutoCommit(false)
val r = Await.result(fn(conn), Duration.Inf)
conn.commit()
r
} catch {
case e: Throwable =>
conn.rollback()
throw e
} finally
conn.close()
}
}

但这样我就屏蔽了Await.result。我想这不是一个大问题,因为我在blocking块内进行阻塞,但我担心这会产生不可预见的后果,而且不一定是这个API的调用方所期望的。

有没有一种方法可以在不使用Await的情况下绕过这种僵局,只依靠Future组合?

我想可以这样做:这个函数不接受Connection => Future[T],而只接受Connection => T,但我想保留那个API。

如果我将ForkJoinPool的大小增加到足够大,它是有效的,但对于所有工作负载来说,这个大小很难计算/预测,而且我不希望ForkJoinPool的大小是我的数据库池的许多倍。

如注释中所述,fn是阻塞代码。但是它不在blocking子句中,所以它将绑定池中的一个主线程。如果这种情况发生足够多次,则池将耗尽线程,系统将死锁。

因此,对fn的调用和随后的代码需要位于blocking子句内部,以便为其创建一个单独的线程,并且主线程仍然可用于非阻塞代码。

考虑到阻塞代码的数量,可能值得研究一个Task模型,该模型每个连接有一个线程,而不是每个挂起操作有一个螺纹,这样线程的数量就会受到限制。这基本上是为了解决getConnection是同步的这一事实,这是HikariCP的一个问题。

相关内容

  • 没有找到相关文章