我正在为mongodb使用kafka接收器连接器。在这里,我想把一些json文档从kafka主题推送到mongodb,但我在文档中使用$oid时遇到了错误。
以下是错误:
{"name":"mongodb-sink-connector","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.ntat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)ntat java.base/java.lang.Thread.run(Thread.java:829)nCaused by: org.apache.kafka.connect.errors.DataException: Failed to write mongodb documentsntat com.mongodb.kafka.connect.sink.MongoSinkTask.bulkWriteBatch(MongoSinkTask.java:227)ntat java.base/java.util.ArrayList.forEach(ArrayList.java:1541)ntat com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:122)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)nt... 10 morenCaused by: java.lang.IllegalArgumentException: Invalid BSON field name $oidntat org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:534)ntat com.mongodb.internal.connection.BsonWriterDecorator.writeName(BsonWriterDecorator.java:193)ntat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:117)ntat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42)ntat org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)ntat org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:139)ntat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:118)ntat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42)ntat com.mongodb.internal.connection.SplittablePayload$WriteRequestEncoder.encode(SplittablePayload.java:221)ntat com.mongodb.internal.connection.SplittablePayload$WriteRequestEncoder.encode(SplittablePayload.java:187)ntat org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63)ntat org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)ntat com.mongodb.internal.connection.BsonWriterHelper.writeDocument(BsonWriterHelper.java:77)ntat com.mongodb.internal.connection.BsonWriterHelper.writePayload(BsonWriterHelper.java:59)ntat com.mongodb.internal.connection.CommandMessage.encodeMessageBodyWithMetadata(CommandMessage.java:162)ntat com.mongodb.internal.connection.RequestMessage.encode(RequestMessage.java:138)ntat com.mongodb.internal.connection.CommandMessage.encode(CommandMessage.java:59)ntat com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:268)ntat com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:100)ntat com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:490)ntat com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)ntat com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:253)ntat com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:202)ntat com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:118)ntat com.mongodb.internal.operation.MixedBulkWriteOperation.executeCommand(MixedBulkWriteOperation.java:431)ntat com.mongodb.internal.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:251)ntat com.mongodb.internal.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:76)ntat com.mongodb.internal.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:194)ntat com.mongodb.internal.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:185)ntat com.mongodb.internal.operation.OperationHelper.withReleasableConnection(OperationHelper.java:621)ntat com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:185)ntat com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:76)ntat com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:187)ntat com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:442)ntat com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:422)ntat com.mongodb.kafka.connect.sink.MongoSinkTask.bulkWriteBatch(MongoSinkTask.java:209)nt... 13 moren"}],"type":"sink"}
以下是我在卡夫卡主题中插入的文档:
{"_id": {"$oid": "634fd99b52281517a468f3a7"},"schema": {"type": "struct", "fields": [{"type": "int32","optional": true, "field": "id"}, {"type": "string", "optional": true, "field": "name"}, {"type": "string", "optional": true, "field": "middel_name"}, {"type": "string", "optional": true, "field": "surname"}],"optional": false, "name": "foobar"},"payload": {"id":45,"name":"mongo","middle_name": "mmp","surname": "kafka"}}
以下是我使用的连接器设置:
{
"name": "mongodb-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "migration-mongo",
"connection.uri": "mongodb://abc:xyz@xx.xx.xx.01:27018,xx.xx.xx.02:27018,xx.xx.xx.03:27018/?authSource=admin&replicaSet=dev",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"document.id.strategy.overwrite.existing": "false",
"validate.non.null": false,
"database": "foo",
"collection": "product"
}
}
Kafka Connect JSONConverter有效载荷应仅具有schema
和payload
字段,而不是_id
。你需要"value.converter.schemas.enable": "true"
。如果您将其设置为false,则您可以删除schema
和payload
,并将_id
直接放入有效负载中。。。
Mongo客户端使用的ID通常与Kafka记录密钥本身相关,而不是您所显示的值部分中嵌入的任何值,但这取决于ID策略