我想做什么
我有一个带有多个工作节点的云应用程序,每个工作节点可以处理n个导出请求。导出请求被放入我们数据库中的表中。每个导出请求只能运行一次。如果工作节点当前运行的导出请求少于n个,它会每隔0.1-1秒检查一次,看看是否有可以开始处理的导出请求。
使用存储过程将等待运行的导出请求组织到数据库端的优先级队列中。下面的函数(queuePop()
)是调用数据库的函数。它从优先级队列中获得最高结果,将导出请求标记为正在使用(当状态不再设置为"NEW"时,它被视为正在使用),然后返回要在其他地方处理的导出请求。
public function queuePop() {
$entity_manager->getConnection()->beginTransaction();
// Get every new export request
try {
$statment = $entity_manager->getConnection()->prepare( 'EXEC root.findAllNewExportRequests' );
$statment->execute();
$results = $statment->fetchAll();
if ( count( $results ) > 0 ) {
$queue_item = array (
'export_request_id' => $results[0]['request_id']
);
} else {
$entity_manager->getConnection()->commit();
return NULL;
}
// Return the export request if it exists
$export_request = $entity_manager->find( 'ApplicationModelExportRequest', $queue_item['export_request_id'], DoctrineDBALLockMode::PESSIMISTIC_READ );
if ( !$export_request ) {
// Export request not found
throw new Exception( 'Export request not found.' );
}
// Update the export request
$export_request->setStatus( 'QUEUED' );
// Update the export request in the database
$entity_manager->persist( $export_request );
$entity_manager->flush();
$entity_manager->getConnection()->commit();
return $queue_item;
} catch ( Exception $ex ) {
$entity_manager->getConnection()->rollback();
exit();
}
}
问题
queuePop函数非常有效,它确保单个工作节点永远不会多次运行导出请求,但当有两个或多个节点时,可能会有多个工作节点接收该请求(考虑到它收到的请求数量,这种情况经常发生)。我尝试实现悲观锁定,如图所示(http://doctrine-orm.readthedocs.org/en/latest/reference/transactions-and-concurrency.html#pessimistic-锁定),但它不起作用。文档不是很好,我找不到任何正确使用悲观锁定的例子。
当一个工作节点正在读取/更新导出请求时,我需要其他节点不能(它们必须等待)。
我使用的内容
- nginx版本:nginx/1.4.6(Ubuntu)
- Ubuntu 14.04.1 LTS(GNU/Linux 3.13.0-32-generix x86_64)
- PHP 5.5.9-1ubuntu4.3(fpm-fcgi)
- Zend Engine v2.5.0与Zend OPcache v7.0.3
- 条令2
- Azure SQL
您应该使用悲观写(Doctrine\DBAL\LockMode::PESIMISTIC_WRITE),锁定底层数据库行以进行并发读写操作。而PESIMISTIC_READ仅阻止行被更新而不阻止读取
您不需要存储过程就可以实现这一点。在模型和请求队列表中添加一个足够长的char
列hash
左右,在处理下一个请求之前,从工作进程中计算一个随机的唯一ID(PHP内置函数uniqid()
是一个很好的选择),并发出以下SQL:
UPDATE export_request SET status = "queued", hash = :uniqueHash
WHERE status = "waiting"
ORDER BY id ASC LIMIT 1
(参数uniqueHash
设置为您之前计算的唯一ID,假设id
可用于确定优先级)。
接下来,检索您刚才标记为"的实体;排队的";通过您传递给散列列的CCD_ 7;标记";。
诀窍在于,UPDATE查询是完全原子的,您不会因为竞争条件而面临任何双重处理的风险。您甚至不需要两个语句都有一个环绕事务(UPDATE查询将在自动提交模式下的事务中运行,并且DBMS将确保不会有任何行被排队两次,因为status = "waiting"
条件在哈希字段设置为唯一ID的同时会变为无效)。