在 Akka's Actor with Slick 中插入唱片的最佳方法是什么?



我正在尝试使用数据库actor将记录插入数据库。 我有数百万条记录。但是,在操作后,数据库只有十条记录。我知道数据库连接是一种状态,我认为这种情况有问题。这是我的代码表示。

class DBActor extends Actor with DBConfig {
  override def receive: Receive = {
    case Message(id, title) =>
      db.run(products += Product(id, title))
  }
}
数据库

是一个关系数据库,"产品"是一个TableQuery,DBConfig有一个数据库连接和会话。在保证下插入此参与者的记录的最佳方法是什么。

使用批处理而不是逐个保留记录。db.run() 方法是异步的,因此它立即返回将来的执行,稍后在不同的线程上执行,因此 Actor 除了方法调用(db.run)之外没有做任何事情。您应该在 db.run() 方法的结果上抵制回调(onFailure),以便查看是否发生任何故障。请参阅示例(它不是编译的代码):

case object Insert
case object Flush
class DBActor extends Actor with DBConfig {

   implicit val dispatcher = context.dispatcher
   val bulkLimit:Int = 1000
   val flushTime:Int = 10
  var documents = List.empty[Product]
 /***
  * Start automatic flushing when actor start
  */
  context.system.scheduler.scheduleOnce(flushTime second, self, Flush)
 def receive:Receive={
case document: Document =>
  documents =documents :+ document
  log.info(s"DBActor received document [total count ${documents.length}]")
  if (documents.length >= bulkLimit) self ! Insert
case Insert =>
  if (documents.length > 0) {
    val batch = documents.take(bulkLimit)
    db.run(products ++= batch).onFailure { case ex: Exception => log.error("Getting error on persisting  data", ex) }
    documents = documents.drop(bulkLimit)
  }
case Flush =>
  if (documents.length > 0) self ! Insert
  context.system.scheduler.scheduleOnce(flushTime second, self, Flush)
}
}

最新更新