在MongoDB Java中提交和回滚事务



目前,我正在使用以下代码:-

(我在代码中标记了两行重要的行。在这些行之间只是对现有文档的一些操作)

clientSession.startTransaction();  // Important Line 1.
Document walletDetailDoc = new Document("identifier", "walletBalanceDistribution");
Document foundWalletDetailDoc1 = (Document) walletDistributionCollection.find(walletDetailDoc).first();
walletDetailDoc = new Document("identifier", to + "Backup");
Document foundWalletDetailDoc2 = (Document) walletDistributionCollection.find(walletDetailDoc).first();
walletDetailDoc = new Document("identifier", from + "Backup");
Document foundWalletDetailDoc3 = (Document) walletDistributionCollection.find(walletDetailDoc).first();
assert foundWalletDetailDoc1 != null;
assert foundWalletDetailDoc2 != null;
assert foundWalletDetailDoc3 != null;
Bson updateWalletDoc = walletDetailDoc;
Bson updateWalletDocOperation = new Document("$set", updateWalletDoc);
walletDistributionCollection.updateOne(foundWalletDetailDoc1, updateWalletDocOperation);
updateWalletDoc = new Document("identifier", "walletBalanceDistribution")
.append("totalRTKBalanceForPool", foundWalletDetailDoc2.get("totalRTKBalanceForPool"))
.append("lastCheckedBlockNumber", foundWalletDetailDoc2.get("lastCheckedBlockNumber"))
.append("lastCheckedTransactionIndex", foundWalletDetailDoc2.get("lastCheckedTransactionIndex"))
.append("balanceCollectedAsFees", foundWalletDetailDoc2.get("balanceCollectedAsFees"));
updateWalletDocOperation = new Document("$set", updateWalletDoc);
//////// TEMPORARY... TO BE REMOVED
Thread.sleep(5000);
clientSession.abortTransaction();  // Important Line 2
boolean abc = true;
if(abc) {
return;
}
////////

现在,我期望一旦我调用abortTransaction(),集合将恢复到它在//Important Line 1之前的状态。但这不会发生。当我不断进行操作时,它们被保存在MonogDB中,并且当// Important line 2完成执行时不会恢复。

(以防万一,一切都在try catch中,任何行之间都没有错误。)代码成功到达return语句)

此外,如果需要,我使用以下maven依赖项:-

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.2.0</version>
</dependency>

如何正确地执行此操作,以使所有操作都完成,或者什么都不做(即回滚/中止)。

您需要在会话事务中包含写操作,例如:

原始收集状态:

mongos> db.tra.find()
{ "_id" : ObjectId("601088bac5b64c7d32c73a8a"), "today" : ISODate("2021-01-26T21:25:14.835Z") }
{ "_id" : ObjectId("60108901c5b64c7d32c73a8b"), "today" : ISODate("2021-01-26T21:26:25.264Z") }
mongos> 

1现在开始会话:

session = db.getMongo().startSession()
mongos> session.startTransaction()

在事务中将测试文档插入到集合中:

mongos> session.getDatabase("test").tra.insert({a:"this will be aborted",today : new Date()})
WriteResult({ "nInserted" : 1 })

4现在检查集合,没有插入文档(事务尚未提交):

mongos> db.tra.find()
{ "_id" : ObjectId("601088bac5b64c7d32c73a8a"), "today" : ISODate("2021-01-26T21:25:14.835Z") }
{ "_id" : ObjectId("60108901c5b64c7d32c73a8b"), "today" : ISODate("2021-01-26T21:26:25.264Z") }

现在终止事务:

mongos> session.abortTransaction()

6你可以看到没有插入(事务被中止):

mongos> db.tra.find()
{ "_id" : ObjectId("601088bac5b64c7d32c73a8a"), "today" : ISODate("2021-01-26T21:25:14.835Z") }
{ "_id" : ObjectId("60108901c5b64c7d32c73a8b"), "today" : ISODate("2021-01-26T21:26:25.264Z") }
mongos> 

当然,如果您将步骤5替换为session.commitTransaction(),则事务将被提交,并且您将看到应用于集合的操作。

这里有一个很好的java示例:https://www.mongodb.com/blog/post/java-and-mongodb-40-support-for-multidocument-acid-transactions

正确答案很简单…

(通过参考@R2D2回答最底部的链接找到)

(TL;DR:-添加clientSession作为updateOne方法的第一个参数).

我们所要做的就是:-

clientSession.startTransaction();  // Important Line 1.
Document walletDetailDoc = new Document("identifier", "walletBalanceDistribution");
Document foundWalletDetailDoc1 = (Document) walletDistributionCollection.find(walletDetailDoc).first();
walletDetailDoc = new Document("identifier", to + "Backup");
Document foundWalletDetailDoc2 = (Document) walletDistributionCollection.find(walletDetailDoc).first();
walletDetailDoc = new Document("identifier", from + "Backup");
Document foundWalletDetailDoc3 = (Document) walletDistributionCollection.find(walletDetailDoc).first();
assert foundWalletDetailDoc1 != null;
assert foundWalletDetailDoc2 != null;
assert foundWalletDetailDoc3 != null;

Bson updateWalletDoc = walletDetailDoc;
Bson updateWalletDocOperation = new Document("$set", updateWalletDoc);
walletDistributionCollection.updateOne(clientSession, foundWalletDetailDoc1, updateWalletDocOperation);
updateWalletDoc = new Document("identifier", "walletBalanceDistribution")
.append("totalRTKBalanceForPool", foundWalletDetailDoc2.get("totalRTKBalanceForPool"))
.append("lastCheckedBlockNumber", foundWalletDetailDoc2.get("lastCheckedBlockNumber"))
.append("lastCheckedTransactionIndex", foundWalletDetailDoc2.get("lastCheckedTransactionIndex"))
.append("balanceCollectedAsFees", foundWalletDetailDoc2.get("balanceCollectedAsFees"));
updateWalletDocOperation = new Document("$set", updateWalletDoc);

//////// TEMPORARY... TO BE REMOVED
Thread.sleep(5000);
clientSession.abortTransaction();  // Important Line 2
boolean abc = true;
if(abc) {
return;
}
////////

最新更新