Kafka S3中PARQUET格式的源连接器



我有使用Protobuf生成的主题事件。我可以使用Parquet格式的S3 sink连接器成功地将主题事件沉入到S3桶中。现在我的S3桶中有类型为.parquet.key.parquet的对象。使用以下配置,所有这些都按照预期工作:

{
"name": "s3-sink",
"config": {
"name": "s3-sink",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"keys.format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "https://my-schema-registry",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "MY_SR_API_KEY:MY_SR_API_SECRET",
"store.kafka.keys": true,
"parquet.codec": "none",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"locale": "en-US",
"s3.bucket.name": "my-bucket-123",
"s3.region": "eu-west-1",
"time.interval": "HOURLY",
"flush.size": 1000,
"tasks.max": 1,
"topics.regex": "test-topic.*",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "my-bootstrap-server",
"confluent.topic.replication.factor": 3,
"confluent.license.topic.replication.factor": 1,
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="MY_API_KEY" password="MY_API_SECRET";",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.ssl.endpoint.identification.algorithm": "https"
}
}
}

现在我想把我的键和值在my-bucket-123(parquet格式)到一个Kafka主题使用Protobuf。为此,我使用以下配置通过Confluent设置了一个新的S3源连接器(confluentinc/kafka-connect-s3-source:1.4.5):

{
"name": "s3-source",
"config": {
"name": "s3-source",
"dest.kafka.bootstrap.servers": "my-bootstrap-server",
"dest.topic.replication.factor": 1,
"dest.kafka.security.protocol": "SASL_SSL",
"dest.kafka.sasl.mechanism": "PLAIN",
"dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="MY_API_KEY" password="MY_API_SECRET";",
"tasks.max": 1,
"connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "my-bootstrap-server",
"confluent.topic.replication.factor": 3,
"confluent.license.topic.replication.factor": 1,
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="MY_API_KEY" password="MY_API_SECRET";",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": ".*",
"transforms.AddPrefix.replacement": "copy_of_$0",
"s3.region": "eu-west-1",
"s3.bucket.name": "my-bucket-123"
}
}

通过使用上述配置,我无法启动S3源连接器。如果我使用上面的配置和命令验证配置:

curl -X PUT -d @config.json --header "content-Type:application/json" http://localhost:8083/connector-plugins/S3SourceConnector/config/validate

我在format.class属性中得到以下错误:

"errors":[
"Invalid value io.confluent.connect.s3.format.parquet.ParquetFormat for configuration format.class: Class io.confluent.connect.s3.format.parquet.ParquetFormat could not be found.",
"Invalid value null for configuration format.class: Class must extend: io.confluent.connect.cloud.storage.source.StorageObjectFormat"
]

我开始认为这个S3源连接器不支持Parquet格式。我试着对JSON、AVRO和BYTE格式进行验证,它们都是可以的。

深入研究S3源连接器jar文件(1.4.5),我没有找到Parquet格式的文件:

Jar文件中的格式

有人知道这是怎么回事吗?是否有其他方法可以将数据从S3 - Parquet格式放回我的Kafka集群?

谢谢!

Confluent的S3源连接器文档:

开箱即用,连接器支持以Avro和JSON格式从S3读取数据。除了带模式的记录外,连接器还支持在文本文件中导入不带模式的纯JSON记录,每行一条记录。通常,连接器可以接受提供format接口实现的任何格式。

所以这意味着你应该能够添加/插件实现parquet格式,但它不是内置的开箱即用

格式的源代码:

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetFormat.java

相关内容

  • 没有找到相关文章

最新更新