使用promise.map的每个promise值作为下一个循环的输入



我做了一整天的研究,研究如何在Promise.map中获得每个承诺的结果,并在同一Promise.map的循环期间将其用作下一次迭代的输入。严格来说,我需要使用这种方法,因为我在数据库操作中使用了这种逻辑,必须是ATOMIC,并且在拒绝任何承诺的情况下,必须回滚所有以前的事务。

注意:我使用了promise.each,它运行良好,只是它不允许我关联个人承诺,并在失败时回滚所有。因此,当仔细解析每个promise并返回值而不在下一个循环中导致Error: Transaction query already complete时,Promise.map似乎是最好的解决方案。以下是knex的逻辑:

var obj={};
knex.transaction(function(trx) {
return Promise.map(array, function(item) {
return trx.insert(item).into('table')
.then(returnedFields => {
//do some validation/operation and return the result or the returnedFields themselves as input in the next loop.
//[START EDIT: this responds to comment by @ Mikael Lepistö for clarity]
//update obj here to be used in next loop
//[END EDIT]
});
}, {concurrency: 1});
})
.then(function(inserts) {
console.log(inserts.length + 'Items saved.');
})
.catch(function(error) {
console.error(error);
})

看看这个:

var promise1 = Promise.resolve(3);
var promise2 = 42;
var promise3 = new Promise(function(resolve, reject) {
setTimeout(resolve, 100, 'foo');
});
Promise.all([promise1, promise2, promise3]).then(function(values) {
console.log(values);
});

https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/all

您可以查看async.js库https://caolan.github.io/async/docs.html

以下是你可以做的

knex.transaction(function(trx) {
return async.mapValuesSeries(array, (item) => {
return trx.insert(item).into('table')
.then(returnedFields => {
//do some validation/operation and return the result or the returnedFields themselves as input in the next loop.
});
}, (error, inserts) => {
if (err) { 
console.error(error);
}
console.log(inserts.length + 'Items saved.');
});
});

感谢所有发表精彩/更深刻见解的人。然而,事实证明Promise.eachknex配合良好,并且我在循环期间错误地在一些单独的promisesthen内部调用commit,从而导致Error: Transaction query already complete在后续循环的下一次事务尝试中原因:没有必要在knex事务上下文/块中调用commit,因为它是在该上下文中自动触发的。

在下面的答案中注意:

KnexPromise.each一起使用需要监听每个promise的then块内部可能的拒绝,并使用try/catch,在某些情况下显式拒绝,否则后续的promise/值将继续循环,这不能使数据库成为原子

knex.transaction(function(trx) {
return Promise.map(array, function(item) {
return trx.insert(item).into('table')
.then(returnedFields => {
try{
//do some validation/operation and return the result or the returnedFields themselves as input in the next loop.
/* START Testing a case of rejection */
if (array.indexOf(item) === 3) {
//uncomment the code below to test
//throw new Error('BreakException');
}
/* END */
}catch(err){
fail=true;
}
}).then(val => {
if (fail) {
trx.rollback()//you can ignore this as is automatically triggered here - tested
return Promise.reject(new Error('Rejected'))
}
return val
})
.catch(err => {
fail = true
trx.rollback()//you can ignore this as is automatically triggered here - tested
return Promise.reject(new Error('Rejected'))
});
});
})
.then(function(inserts) {
console.log(inserts.length + 'Items saved.');
})
.catch(function(error) {
console.error(error);
})

相关内容

最新更新