我有一个Play
插件,它每15分钟执行一个方法,如下所示:
import scala.concurrent.Future
import scala.concurrent.duration._
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.{Application, Plugin}
import akka.actor.Cancellable
class AuthPlugin(app: Application) extends Plugin {
private var cancellableDoSomething: Option[Cancellable] = None
...
override def onStart = {
cancellableDoSomething = Some(
Akka.system.scheduler.scheduleOnce(0.seconds) {
doSomething.foreach { _ =>
Akka.system.scheduler.scheduleOnce(15.minutes)(doSomething)
}
}
)
}
override def onStop = {
cancellableDoSomething.foreach(_.cancel)
}
}
object AuthPlugin {
...
def doSomething: Future[Unit] = {
// Access to shared DB here...
}
}
假设我有两个Play
应用程序实例在两个不同的主机上运行,当然,它们每15分钟调用一次doSomething
。
如何确保在某个特定时刻只有一个实例实际运行doSomething
?
编辑
两个Play
实例访问同一数据库,doSomething
处理状态为pending
的记录。在状态pending
中的记录被处理之后,它被更新为状态processed
。
如果第一个Play
实例在数据库中查询挂起的订单,并且在处理这些订单时,第二个Play
实例运行相同的查询,则它会获得一个记录集,该记录集也由第一个Play
实例已经查询但尚未处理的挂起订单组成。为了避免这种情况,Play
实例应该能够调用doSomething
,当且仅当没有其他实例正在执行它时
我可以实现一种基于DB的信号量,其中获取它的第一个Play
实例将字段的值设置为1
或其他任何值。。。但是这种解决方案是不稳定的,因为如果Play
实例失败并且从未将字段设置回0
,那么其他Play
实例就不再能够获取信号量并调用doSomething
。
使用为某些操作提供锁定机制的数据库,例如在UPDATE期间
示例:MySQL:
"MySQL对InnoDB表使用行级锁定">
"MySQL对MyISAM、MEMORY和MERGE表使用表级锁定">
这意味着您可以使用上面的任何一个表来获得锁定机制。
锁定程序
1.找到你的机器想要做的工作。假设你用jobId=123
找到了一份
2.尝试原子锁定
UPDATE jobs SET state='started' and worker='worker1' WHERE jobId=123 AND state='pending';
3.检查获取的锁是否由机器'worker1'
获取
SELECT COUNT(*) FROM JOBS where jobId=123 AND state='started' AND worker='worker1';
4.如果上述查询返回1,则'worker1'
已成功保留作业,不包括所有其他机器。如果查询返回0,则意味着其他机器(例如'worker2'
(首先获取了锁。
作业完成时
UPDATE jobs SET state='finished' WHERE jobId=123;<br>
如果预留机出现故障怎么办
当尝试锁定(通过UPDATE(时,还要添加一个时间戳列。然后,让另一个后台作业定期清除早于1小时但状态仍然"启动"的作业。