ParquetProtoWriters创建一个不可读的parquet文件



我的.proto文件中包含一个map类型的字段。

Message Foo {
...
...
map<string, uint32> fooMap = 19; 
}

我正在从Kafka源中消费消息,并试图将消息作为parquet文件写入S3桶。代码的相关部分如下所示:

val basePath = "s3a:// ..."
env
.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
.map(x => toJavaProto(x))
.sinkTo(
FileSink
.forBulkFormat(basePath, ParquetProtoWriters.forType(classOf(Foo)))
.withOutputFileConfig(
OutputFileConfig
.builder()
.withPartPrefix("foo")
.withPartSuffix(".parquet")
.build()
)
.build()
)
.setParallelism(1)
env.execute()

结果是,实际上为S3编写了一个parquet文件,但该文件似乎已损坏。当我尝试使用Avro/Parquet查看器插件读取文件时,我可以看到这个错误:

无法处理文件…/下载/foo - 9366 c15f - 270 - e - 4939 - ad88 b77ee27ddc2f - 0. -拼花java.lang.UnsupportedOperationException: repeat不支持在LIST或MAP之外。类型:重复组fooMap = 19{可选二进制密钥(STRING) = 1;可选int32 value = 2;},org.apache.parquet.avro.AvroSchemaConverter.convertFields (AvroSchemaConverter.java: 277)在org.apache.parquet.avro.AvroSchemaConverter.convert (AvroSchemaConverter.java: 264)在org.apache.parquet.avro.AvroReadSupport.prepareForRead (AvroReadSupport.java: 134)在org.apache.parquet.hadoop.InternalParquetRecordReader.initialize (InternalParquetRecordReader.java: 185)在org.apache.parquet.hadoop.ParquetReader.initReader (ParquetReader.java: 156)在org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java: 135)在uk.co.hadoopathome.intellij.viewer.fileformat.ParquetFileReader.getRecords (ParquetFileReader.java: 99)在uk.co.hadoopathome.intellij.viewer.FileViewerToolWindow 2.美元doinbackground (FileViewerToolWindow.java: 193)在uk.co.hadoopathome.intellij.viewer.FileViewerToolWindow 2.美元doinbackground (FileViewerToolWindow.java: 184)在java.desktop/javax.swing.SwingWorker 1.美元调用(SwingWorker.java: 304)在java.base/java.util.concurrent.FutureTask.run (FutureTask.java: 264)在java.desktop/javax.swing.SwingWorker.run(SwingWorker.java:343) atjava.base/java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1128)在java.base/java.util.concurrent.ThreadPoolExecutor Worker.run美元(ThreadPoolExecutor.java: 628)在java.base/java.lang.Thread.run (Thread.java: 829)

Flink版本1.15原型2

parquet-formatparquet-mr有突破性的变化。我不熟悉Flink,但我猜你必须配置org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters来生成正确的格式。


我直接使用parquet-mr,遇到了同样的问题。avro拼花阅读器无法读取由以下代码生成的拼花文件:

import org.apache.parquet.proto.ProtoParquetWriter;
import org.apache.parquet.proto.ProtoWriteSupport;
...
var conf = new Configuration();
ProtoWriteSupport.setWriteSpecsCompliant(conf, false);
var builder = ProtoParquetWriter.builder(file)
.withMessage(Xxx.class)
.withCompressionCodec(CompressionCodecName.GZIP)
.withWriteMode(Mode.OVERWRITE)
.withConf(conf);
try (var writer = builder.build()) {
writer.write(pb.toBuilder());
}

如果将配置值修改为true,则会成功:

ProtoWriteSupport.setWriteSpecsCompliant(conf, true);

通过查看其源代码,我们可以知道该函数用于在配置中设置parquet.proto.writeSpecsCompliant的布尔值。

ParquetProtoWriters.forType的源代码中,它用构建器类ParquetProtoWriterBuilder创建了一个工厂,该工厂在内部使用org.apache.parquet.proto.ProtoWriteSupport。我猜你可以给它分配一个正确配置的ProtoWriteSupport


我还安装了这个Python工具:https://pypi.org/project/parquet-tools/来检查拼花文件。旧格式生成的列表字段如下:

...
############ Column(f2) ############
name: f2
path: f1.f2
...

和新的格式将像:

...
############ Column(element) ############
name: element
path: f1.f2.list.element
...

希望这篇文章能给你一些指导。

引用

  • https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/LogicalTypes.md nested-types
  • https://github.com/apache/parquet-format/pull/51
  • https://github.com/apache/parquet-mr/pull/463

相关内容

  • 没有找到相关文章

最新更新