如何使用 pg-promise 独立执行一批交易?



我们在主数据同步后端函数中遇到了问题。我们客户的移动设备每天都在推送更改,但是上周他们警告我们,主Web应用程序中的一些更改没有更新。

在日志中进行一些调查后,我们发现确实存在单个事务失败和回滚。但是,似乎此事务之前的所有事务也会回滚。

代码以这种方式工作。要同步的数据是一个"变更集"数组,每个变更集可以一次更新多个表。重要的是,Changset 必须完全更新或根本不更新,因此每个 changset 都包含在事务中。然后每个事务一个接一个地执行。如果事务失败,其他事务不应受到影响。

我怀疑所有事务实际上都是以某种方式组合的,可能是通过主 db.task。我们不只是循环执行事务,而是使用 db.task 批量执行它们,避免在同一表上发生更新冲突。

任何建议我们如何批量执行这些事务并避免此回滚问题?

谢谢,这是同步代码的片段:

// Begin task that will execute transactions one after the other

db.task(task => {

const transactions = [];
// Create a transaction for each changeset (propriete/fosse/inspection)
Object.values(data).forEach((change, index) => {
const logchange = { tx: index };
const c = {...change}; // Use a clone of the original change object
transactions.push(
task.tx(t => {
const queries = [];
// Propriete
if (Object.keys(c.propriete.params).length) {
const params = proprietes.parse(c.propriete.params);
const propriete = Object.assign({ idpropriete: c.propriete.id }, params);
logchange.propriete = { idpropriete: propriete.idpropriete };
queries.push(t.one(`SELECT ${Object.keys(params).join()} FROM propriete WHERE idpropriete = $1`, propriete.idpropriete).then(previous => {
logchange.propriete.previous = previous;
return t.result('UPDATE propriete SET' + qutil.setequal(params) + 'WHERE idpropriete = ${idpropriete}', propriete).then(result => {
logchange.propriete.new = params;
})
}));
}
else delete c.propriete;
// Fosse
if (Object.keys(c.fosse.params).length) {
const params = fosses.parse(c.fosse.params);
const fosse = Object.assign({ idfosse: c.fosse.id }, params);
logchange.fosse = { idfosse: fosse.idfosse };
queries.push(t.one(`SELECT ${Object.keys(params).join()} FROM fosse WHERE idfosse = $1`, fosse.idfosse).then(previous => {
logchange.fosse.previous = previous;
return t.result('UPDATE fosse SET' + qutil.setequal(params) + 'WHERE idfosse = ${idfosse}', fosse).then(result => {
logchange.fosse.new = params;
})
}));
}
else delete c.fosse;
// Inspection (rendezvous)
if (Object.keys(c.inspection.params).length) {
const params = rendezvous.parse(c.inspection.params);
const inspection = Object.assign({ idvisite: c.inspection.id }, params);
logchange.rendezvous = { idvisite: inspection.idvisite };
queries.push(t.one(`SELECT ${Object.keys(params).join()} FROM rendezvous WHERE idvisite = $1`, inspection.idvisite).then(previous => {
logchange.rendezvous.previous = previous;
return t.result('UPDATE rendezvous SET' + qutil.setequal(params) + 'WHERE idvisite = ${idvisite}', inspection).then(result => {
logchange.rendezvous.new = params;
})
}));
}
else delete change.inspection;
// Cheminees
c.cheminees = Object.values(c.cheminees).filter(cheminee => Object.keys(cheminee.params).length);
if (c.cheminees.length) {
logchange.cheminees = [];
c.cheminees.forEach(cheminee => {
const params = cheminees.parse(cheminee.params);
const ch = Object.assign({ idcheminee: cheminee.id }, params);
const logcheminee = { idcheminee: ch.idcheminee };
queries.push(t.one(`SELECT ${Object.keys(params).join()} FROM cheminee WHERE idcheminee = $1`, ch.idcheminee).then(previous => {
logcheminee.previous = previous;
return t.result('UPDATE cheminee SET' + qutil.setequal(params) + 'WHERE idcheminee = ${idcheminee}', ch).then(result => {
logcheminee.new = params;
logchange.cheminees.push(logcheminee);
})
}));
});
}
else delete c.cheminees;
// Lock from further changes on the mobile device
// Note: this change will be sent back to the mobile in part 2 of the synchronization
queries.push(t.result('UPDATE rendezvous SET timesync = now() WHERE idvisite = $1', [c.idvisite]));
console.log(`transaction#${++transactionCount}`);
return t.batch(queries).then(result => { // Transaction complete
logdata.transactions.push(logchange);
});
})
.catch(function (err) { // Transaction failed for this changeset, rollback
logdata.errors.push({ error: err, change: change }); // Provide error message and original change object to mobile device
console.error(JSON.stringify(logdata.errors));
})
);
});
console.log(`Total transactions: ${transactions.length}`);
return task.batch(transactions).then(result => { // All transactions complete
// Log everything that was uploaded from the mobile device
log.log(res, JSON.stringify(logdata));
});

我很抱歉,当问题在太多层面上出错时,这几乎不可能做出最终的好答案......

更改集

必须完全更新或根本不更新,这一点很重要,因此每个更改集都包装在事务中。

如果更改集需要数据完整性,则整个事务必须是一个事务,而不是一组事务。

然后每个事务一个接一个地执行。如果事务失败,其他事务不应受到影响。

同样,数据完整性是单个事务所保证的,您需要将其变成一个事务,而不是多个事务。

我怀疑所有事务实际上都是以某种方式组合的,可能是通过主 db.task。

它们被组合在一起,不是通过task,而是通过方法tx

任何建议我们如何批量执行这些事务并避免此回滚问题?

通过将它们合并到单个事务中。

您将在顶部使用单个tx调用,仅此而已,那里不需要任何任务。如果下面的代码使用自己的事务,您可以更新它以允许条件事务。

此外,在构建复杂事务时,应用程序从使用 pg-promise-demo 中显示的存储库模式中受益匪浅。您可以在存储库中拥有支持条件事务的方法。

你应该重做你的代码,以避免它做的可怕的事情,比如手动查询格式。例如,永远不要使用像SELECT ${Object.keys(params).join()}这样的东西,这是灾难的秘诀。使用 pg-promise 为您提供的正确查询格式,例如本例中的 SQL 名称。

最新更新