是否可以使用单个 s3 接收器连接器为不同主题使用不同类型的 Avro 架构来指向时间戳字段的相同字段名称?



schema for topic t1

{
"type": "record",
"name": "Envelope",
"namespace": "t1",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},

],
"connect.name": "t1.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
}

],
"connect.name": "t1.Envelope"
}

schema

{
"type": "record",
"name": "Value",
"namespace": "t2",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
}
],
"connect.name": "t2.Value"
}

s3-sink连接器配置

connector.class=io.confluent.connect.s3.S3SinkConnector
behavior.on.null.values=ignore
s3.region=us-west-2
partition.duration.ms=1000
flush.size=1
tasks.max=3
timezone=UTC
topics.regex=t1,t2
aws.secret.access.key=******
locale=US
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=false
name=s3-sink-connector
aws.access.key.id=******
errors.tolerance=all
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=s3-sink-connector-bucket
path.format=YYYY/MM/dd
timestamp.extractor=RecordField
timestamp.field=after.createdAt

通过使用这个连接器配置,我得到了t2主题的错误"createdAt字段不存在">.如果我设置时间戳。field = createdAt如果t1主题"createdAt字段不存在",则抛出错误.

如何指向"createdAt">字段在两个模式中同时使用相同的连接器?

是否可以通过使用单个s3-sink连接器配置来实现这一点?

如果这种情况是可能的,那么我该如何做到这一点,我必须使用哪些属性来实现这一点?

如果有人对此有想法,请提出建议。如果有其他方法,也请建议。

所有主题都需要相同的时间戳字段;没有办法配置主题到字段的映射。

t2模式没有after字段,因此需要运行两个单独的连接器

该字段也需要出现在所有记录中,否则分区程序将无法工作。

最新更新