我正在尝试使用数据库actor将记录插入数据库。 我有数百万条记录。但是,在操作后,数据库只有十条记录。我知道数据库连接是一种状态,我认为这种情况有问题。这是我的代码表示。
class DBActor extends Actor with DBConfig {
override def receive: Receive = {
case Message(id, title) =>
db.run(products += Product(id, title))
}
}
数据库是一个关系数据库,"产品"是一个TableQuery,DBConfig有一个数据库连接和会话。在保证下插入此参与者的记录的最佳方法是什么。
使用批处理而不是逐个保留记录。db.run() 方法是异步的,因此它立即返回将来的执行,稍后在不同的线程上执行,因此 Actor 除了方法调用(db.run)之外没有做任何事情。您应该在 db.run() 方法的结果上抵制回调(onFailure),以便查看是否发生任何故障。请参阅示例(它不是编译的代码):
case object Insert
case object Flush
class DBActor extends Actor with DBConfig {
implicit val dispatcher = context.dispatcher
val bulkLimit:Int = 1000
val flushTime:Int = 10
var documents = List.empty[Product]
/***
* Start automatic flushing when actor start
*/
context.system.scheduler.scheduleOnce(flushTime second, self, Flush)
def receive:Receive={
case document: Document =>
documents =documents :+ document
log.info(s"DBActor received document [total count ${documents.length}]")
if (documents.length >= bulkLimit) self ! Insert
case Insert =>
if (documents.length > 0) {
val batch = documents.take(bulkLimit)
db.run(products ++= batch).onFailure { case ex: Exception => log.error("Getting error on persisting data", ex) }
documents = documents.drop(bulkLimit)
}
case Flush =>
if (documents.length > 0) self ! Insert
context.system.scheduler.scheduleOnce(flushTime second, self, Flush)
}
}