Mongo CDC抛出:BSONObjectTooLarge.如何忽略这一点并继续进行



我只想监听数据库中的3个集合:c1、c2、c3。我无法想象如何限制只听这3个合集。下面是我的代码。

  1. 我想忽略这个错误并继续进行。怎么做?在这种情况下,光标本身不会被创建
  2. 就像我之前说的,有没有办法限制只监听集合c1c2c3集合?——在数据库侧。下面的代码监听完整的数据库,然后在java端过滤集合
List<Bson> pipeline = singletonList(match(in("operationType", asList("insert", "delete", "update"))));
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
String resumeTokenStr = getResumeTokenFromS3(cdcConfig);
if (resumeTokenStr == null) {
cursor = mongoClient.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
} else {
BsonDocument resumeToken = BsonDocument.parse(resumeTokenStr);
cursor = mongoClient.watch(pipeline).batchSize(1).maxAwaitTime(60, TimeUnit.SECONDS).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
}
return cursor;

上面的代码抛出下面的错误

com.mongodb.MongoCommandException: Command failed with error 10334 (BSONObjectTooLarge): 'BSONObj size: 16795345 (0x10046D1) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "826337A73B0000000A2B022C0100296E5A1004B317A529F739433BA840730515AC0EAC46645F6964006462624E8146E0FB000934F6560004" }' on server crm-mongo-report01.prod.phenom.local:27017. The full response is {"operationTime": {"$timestamp": {"t": 1664707966, "i": 25}}, "ok": 0.0, "errmsg": "BSONObj size: 16795345 (0x10046D1) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "826337A73B0000000A2B022C0100296E5A1004B317A529F739433BA840730515AC0EAC46645F6964006462624E8146E0FB000934F6560004" }", "code": 10334, "codeName": "BSONObjectTooLarge", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1664707966, "i": 26}}, "signature": {"hash": {"$binary": {"base64": "NZDJKhCse19Eud88kNh7XRWRgas=", "subType": "00"}}, "keyId": 7113062344413937666}}}
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:198)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:413)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:337)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:116)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:644)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:240)
at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:226)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:126)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:116)
at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:345)
at com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:232)
at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:214)
at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$2(OperationHelper.java:575)
at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)
at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$3(OperationHelper.java:574)
at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)
at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:573)
at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:211)
at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:65)
at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:217)
at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:197)
at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:347)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:343)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:538)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:343)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:58)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:191)
at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:221)
at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:174)
at com.company.cdc.services.CDCMain.getCursorAtResumeToken(CdcServiceMain.java:217)

线217指向线:cursor = mongoClient.watch(pipeline).batchSize(1).maxAwaitTime(60, TimeUnit.SECONDS).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();

这就是我最终解决问题的方式。(只是为了防止其他人正在寻找这个问题的解决方案(

  1. 在创建光标时删除了FullDocument.UPDATE_LOOKUP。所以现在我的代码看起来像cursor = mongoClient.watch(pipeline).batchSize(20000).cursor();——现在这避免了文档中可能出现的巨大列,该列最终会出错。这起到了作用。

  2. 在我的情况下,我不必听收集的更新,其中有这样糟糕的数据。因此,我修改了光标,只监听我感兴趣的数据库和集合,而不是监听整个数据库,然后忽略不需要的集合。以下是代码

  3. 当写到目的地时,我在mongodb上进行了批量查找,构建了完整的文档,然后编写它——这种延迟查找的方法减少了java程序的大量内存占用。


private List<Bson> generatePipeline(CdcConfigs cdcConfig) {
List<String> whiteListedCollections = getWhitelistedCollection(cdcConfig);
List<String> whiteListedDbs = getWhitelistedDbs(cdcConfig);
log.info("whitelisted dbs:" + whiteListedDbs + " coll:" + whiteListedCollections);
List<Bson> pipeline;
if (whiteListedDbs.size() > 0)
pipeline = singletonList(match(and(
in("ns.db", whiteListedDbs),
in("ns.coll", whiteListedCollections),
in("operationType", asList("insert", "delete", "update")))));
else
pipeline = singletonList(match(and(
in("ns.coll", whiteListedCollections),
in("operationType", asList("insert", "delete", "update")))));
return pipeline;
}
private MongoChangeStreamCursor<ChangeStreamDocument<Document>> getCursorAtResumeToken(CdcConfigs cdcConfig, MongoClient mongoClient) {
List<Bson> pipeline = generatePipeline(cdcConfig);
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
String resumeTokenStr = getResumeTokenFromS3(cdcConfig);
if (resumeTokenStr == null) {
//            cursor = mongoClient.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
cursor = mongoClient.watch(pipeline).batchSize(20000).cursor();
log.warn("RESUME TOKEN IS NULL. READING CDC FROM CURRENT TIMESTAMP FROM MONGO DB !!! ");
} else {
BsonDocument resumeToken = BsonDocument.parse(resumeTokenStr);
cursor = mongoClient.watch(pipeline).batchSize(20000).maxAwaitTime(30, TimeUnit.MINUTES).startAfter(resumeToken).cursor();
//            cursor = mongoClient.watch(pipeline).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
}
return cursor;
}

sofar提供的解决方案更倾向于python代码。因此,将它们翻译成Java是一个挑战。

我的情况有点不同。我有一个触发器,用于将文档从一个集合与另一个集合对齐,并可以选择在更改事件中包括fullDocument。问题是添加到事件中的元数据与fullDocument一起超过了BSON最大大小(16MB(。

我解决了从触发器中删除fullDocument并从集合本身获取文档执行聚合的问题:

collection1.aggregate([
{$match: {_id: changeEvent.documentKey._id}},
{ $merge: { into: "collection", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
]);

相关内容

最新更新