MSK S3接收器无法与字段分区器一起工作



我正在使用AWS MSK和MSK连接。当我添加io.confluent.connect.storage.partitioner.FieldPartitioner时,S3接收器连接器工作不正常注意:没有FieldPartitioner,s3sink就工作了。除了这个堆栈溢出问题链接,我找不到任何资源

错误

ERROR [FieldPart-sink|task-0] Value is not Struct type. (io.confluent.connect.storage.partitioner.FieldPartitioner:81)
Caused by: io.confluent.connect.storage.errors.PartitionException: Error encoding partition.
ERROR [Sink-FieldPartition|task-0] WorkerSinkTask{id=Sink-FieldPartition-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Error encoding partition. (org.apache.kafka.connect.runtime.WorkerSinkTask:612)

MSK连接配置

connector.class=io.confluent.connect.s3.S3SinkConnector
format.class=io.confluent.connect.s3.format.avro.AvroFormat
flush.size=1
schema.compatibility=BACKWARD
tasks.max=2
topics=MSKTutorialTopic
storage.class=io.confluent.connect.s3.storage.S3Storage
topics.dir=mskTrials
s3.bucket.name=clickstream
s3.region=us-east-1
partitioner.class=io.confluent.connect.storage.partitioner.FieldPartitioner
partition.field.name=name
value.converter.schemaAutoRegistrationEnabled=true
value.converter.registry.name=datalake-schema-registry
value.convertor.schemaName=MSKTutorialTopic-value
value.converter.avroRecordType=GENERIC_RECORD
value.converter.region=us-east-1
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

存储在粘贴模式注册表中的数据模式

{
"namespace": "example.avro",
"type": "record",
"name": "UserData",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": [
"int",
"null"
]
},
{
"name": "favourite_color",
"type": [
"string",
"null"
]
}
]
}

为了按字段进行分区,您的数据需要实际的字段。

CCD_ 1不能解析它为添加所述字段而消耗的数据。如果主题中有Avro数据,请使用AvroConverter。此外,Avro总是有一个模式,因此删除schemas.enable配置

最新更新