如何使用kafka Connect从kafka消息中检索传入标头,并使用MongoDB Sink Connector将其存储为MongoDB的附加数据字段。
我有一个卡夫卡主题";PROJECT_EXAMPLE_TOPIC";。如您所见,我已经能够保存消息时间戳、传入消息数据和mongo文档创建/更新日期。
我想在某个地方有一个提取标头的函数。
示例kafka值
// incoming kafka value
{
"msgId" : "exampleId"
}
- 如何获取原始标头header_foo
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" : "exampleId",
"message_header_foo" : "header_foo_value"
}
- 如何获取所有kafka头
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" : "exampleId",
"message_headers" : {
"header_001" : "header_001_value",
"header_002" : "header_002_value",
...
"header_x" : "header_x_value"
}
}
有我的配置
{
"name": "sink-mongo-PROJECT-EXAMPLE",
"config": {
"topics": "PROJECT_EXAMPLE_TOPIC",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url": "SCHEMA_REGISTRY_URL",
"key.converter.schemas.enable": "false",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "SCHEMA_REGISTRY_URL",
"value.converter.schemas.enable": "false",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET",
"connection.uri": "PROJECT_REFERENTIAL_MONGO_URL",
"database": "PROJECT_DB_NAME",
"collection": "EXAMPLE",
"max.num.retries": "3",
"retries.defer.timeout": "5000",
"key.projection.type": "none",
"key.projection.list": "",
"field.renamer.mapping": "[]",
"field.renamer.regex": "[]",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder",
"value.projection.list": "msgId",
"value.projection.type": "whitelist",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
"delete.on.null.values": "false",
"max.batch.size": "0",
"rate.limiting.timeout": "0",
"rate.limiting.every.n": "0",
"change.data.capture.handler": "",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true,
"transforms": "InsertSource,InsertTopic,InsertTimestamp",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "message_source",
"transforms.InsertSource.static.value": "mongo_connector",
"transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field": "message_topic",
"transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTimestamp.timestamp.field": "message_timestamp"
}
}
这是一个有点老的问题,但有一个第三方消息转换,可以将标头转换为键或值上的字段
https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-common/transformations/HeaderToField.html
不过,这不允许您获取所有标头,您需要按名称指定要提取的标头及其类型。