我们有一个c(中心(服务器和几个d(地区服务器(,如d1、d2、d3、d4、d5。
有一些表要复制。为了简单起见,让我们假设我们有一个tblFoo
表,它也存在于d1、d2、d3、d4、d5和c上,并且它具有相同的结构。规则很简单:
- 如果一个记录存在于d服务器上,那么它存在于c服务器上,并且每个字段的值完全相同
- 如果一个记录存在于一个d服务器上(例如d1上(,那么它不存在于任何其他d服务器(d2、d3、d4或d5(上
目标是确保如果对d服务器的tblFoo
(insert
、update
、delete
(进行了更改,那么也应该在c服务器上立即进行更改。这对insert
非常有效(因为idpkFooID
根据定义具有auto_increment
属性(。这也适用于update
和delete
,但我们对此有些担心。这是(的简化版本(代码:
namespace AppORM;
use CakeORMQuery as ORMQuery;
// Some other use statements
class Query extends ORMQuery
{
//Lots of stuff...
/**
* Overrides a method with the same name to handle synchonizations with c
*/
public function execute()
{
//Some tables need replication. If this is such a table, then we need to perform some extra steps. Otherwise we would just call the parent
//Method
if (($this->_repository->getIgnoreType() || (!in_array($this->type(), ['select']))) && $this->isReplicate() && ($this->getConnection()->configName() !== 'c')) {
//Getting the table
$table = $this->_repository->getTable();
//Replicating the query
$replica = clone $this;
//Setting the connection of the replica to c, because we need to apply the district changes on central
$replica->setParentConnectionType('d')->setConnection(ConnectionManager::get('c'));
$replica->setIgnoreType($this->_repository->getIgnoreType());
//We execute the replica first, because we will need to refer to c IDs and not the other way around
$replica->execute();
//If this is an insert, then we need to handle the ids as well
if (!empty($this->clause('insert'))) {
//We load the primary key's name to use it later to find the maximum value
$primaryKey = $this->_repository->getPrimaryKey();
//We get the highest ID value, which will always be a positive number, because we have already executed the query at the replica
$firstID = $replica->getConnection()
->execute("SELECT LAST_INSERT_ID() AS {$primaryKey}")
->fetchAll('assoc')[0][$primaryKey];
//We get the columns
$columns = $this->clause('values')->getColumns();
//In order to add the primary key
$columns[] = $primaryKey;
//And then override the insert clause with this adjusted array
$this->insert($columns);
//We get the values
$values = $this->clause('values')->getValues();
//And their count
$count = count($values);
//There could be multiple rows inserted already into the replica as part of this query, we need to replicate all their IDs, without
//assuming that there is a single inserted record
for ($index = 0; $index < $count; $index++) {
//We add the proper ID value into all of the records to be inserted
$values[$index][$primaryKey] = $firstID + $index;
}
//We override the values clause with this adjusted array, which contains PK values as well
$this->clause('values')->values($values);
}
}
if ($this->isQueryDelete) {
$this->setIgnoreType(false);
}
//We nevertheless execute the query in any case, independently of whether it was a replicate table
//If it was a replicate table, then we have already made adjustments to the query in the if block
return parent::execute();
}
}
担忧如下:如果我们在d1上执行update
或delete
语句,而另一个地区服务器(d2、d3、d4、d5(上的记录将满足这些语句的条件,那么我们最终会在d1上正确执行update
和delete
语句,但一旦在d1执行了相同的语句,我们可能会意外地从c服务器更新/删除其他地区的记录。
为了解决这个问题,建议的解决方案是验证语句,如果不满足以下条件之一,则抛出异常:
- 条件是
=
或IN
- 字段为
[pk|fk]*ID
,即外键或主键
没有复制行为的表将正常执行execute
,上述限制仅对具有复制行为的表格有效,例如我们的示例中的tblFoo
。
问题
如何在执行覆盖中验证更新/删除查询,以便只能搜索主键或外键,并且只能使用=或in运算符
这就是我解决问题的方法。
具有复制行为的模型执行如下验证
<?php
namespace AppORM;
use CakeORMTable as ORMTable;
class Table extends ORMTable
{
protected static $replicateTables = [
'inteacherkeyjoin',
];
public function isValidReplicateCondition(array $conditions)
{
return count(array_filter($conditions, function ($v, $k) {
return (bool) preg_match('/^[s]*[pf]k(' . implode('|', self::$replicateTables) . ')id[s]*((in|=).*)?$/i', strtolower(($k === intval($k)) ? $v : $k));
}, ARRAY_FILTER_USE_BOTH)) > 0;
}
public function validateUpdateDeleteCondition($action, $conditions)
{
if ($this->behaviors()->has('Replicate')) {
if (!is_array($conditions)) {
throw new Exception("When calling {$action} for replicate tables, you need to pass an array");
} elseif (!$this->isValidReplicateCondition($conditions)) {
throw new Exception("Unsafe condition was passed to the {$action} action, you need to specify primary keys or foreign keys with = or IN operators");
}
}
}
public function query()
{
return new Query($this->getConnection(), $this);
}
}
对于Query
类,我们有一个isReplicate
方法来触发我们需要的验证,并且where
方法已被覆盖,以确保条件得到正确验证:
/**
* True if and only if:
* - _repository is properly initialized
* - _repository has the Replicate behavior
* - The current connection is not c
*/
protected function isReplicate()
{
if (($this->type() !== 'select') && ($this->getConnection()->configName() === 'c') && ($this->getParentConnectionType() !== 'd')) {
throw new Exception('Replica tables must always be changed from a district connection');
}
if (in_array($this->type(), ['update', 'delete'])) {
$this->_repository->validateUpdateDeleteCondition($this->type(), $this->conditions);
}
return ($this->_repository && $this->_repository->behaviors()->has('Replicate'));
}
public function where($conditions = null, $types = [], $overwrite = false)
{
$preparedConditions = is_array($conditions) ? $conditions : [$conditions];
$this->conditions = array_merge($this->conditions, $preparedConditions);
return parent::where($conditions, $types, $overwrite);
}