我的.proto文件中包含一个map类型的字段。
Message Foo {
...
...
map<string, uint32> fooMap = 19;
}
我正在从Kafka源中消费消息,并试图将消息作为parquet文件写入S3桶。代码的相关部分如下所示:
val basePath = "s3a:// ..."
env
.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
.map(x => toJavaProto(x))
.sinkTo(
FileSink
.forBulkFormat(basePath, ParquetProtoWriters.forType(classOf(Foo)))
.withOutputFileConfig(
OutputFileConfig
.builder()
.withPartPrefix("foo")
.withPartSuffix(".parquet")
.build()
)
.build()
)
.setParallelism(1)
env.execute()
结果是,实际上为S3编写了一个parquet文件,但该文件似乎已损坏。当我尝试使用Avro/Parquet查看器插件读取文件时,我可以看到这个错误:
无法处理文件…/下载/foo - 9366 c15f - 270 - e - 4939 - ad88 b77ee27ddc2f - 0. -拼花java.lang.UnsupportedOperationException: repeat不支持在LIST或MAP之外。类型:重复组fooMap = 19{可选二进制密钥(STRING) = 1;可选int32 value = 2;},org.apache.parquet.avro.AvroSchemaConverter.convertFields (AvroSchemaConverter.java: 277)在org.apache.parquet.avro.AvroSchemaConverter.convert (AvroSchemaConverter.java: 264)在org.apache.parquet.avro.AvroReadSupport.prepareForRead (AvroReadSupport.java: 134)在org.apache.parquet.hadoop.InternalParquetRecordReader.initialize (InternalParquetRecordReader.java: 185)在org.apache.parquet.hadoop.ParquetReader.initReader (ParquetReader.java: 156)在org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java: 135)在uk.co.hadoopathome.intellij.viewer.fileformat.ParquetFileReader.getRecords (ParquetFileReader.java: 99)在uk.co.hadoopathome.intellij.viewer.FileViewerToolWindow 2.美元doinbackground (FileViewerToolWindow.java: 193)在uk.co.hadoopathome.intellij.viewer.FileViewerToolWindow 2.美元doinbackground (FileViewerToolWindow.java: 184)在java.desktop/javax.swing.SwingWorker 1.美元调用(SwingWorker.java: 304)在java.base/java.util.concurrent.FutureTask.run (FutureTask.java: 264)在java.desktop/javax.swing.SwingWorker.run(SwingWorker.java:343) atjava.base/java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1128)在java.base/java.util.concurrent.ThreadPoolExecutor Worker.run美元(ThreadPoolExecutor.java: 628)在java.base/java.lang.Thread.run (Thread.java: 829)
Flink版本1.15原型2
parquet-format
和parquet-mr
有突破性的变化。我不熟悉Flink,但我猜你必须配置org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters
来生成正确的格式。
我直接使用parquet-mr
,遇到了同样的问题。avro拼花阅读器无法读取由以下代码生成的拼花文件:
import org.apache.parquet.proto.ProtoParquetWriter;
import org.apache.parquet.proto.ProtoWriteSupport;
...
var conf = new Configuration();
ProtoWriteSupport.setWriteSpecsCompliant(conf, false);
var builder = ProtoParquetWriter.builder(file)
.withMessage(Xxx.class)
.withCompressionCodec(CompressionCodecName.GZIP)
.withWriteMode(Mode.OVERWRITE)
.withConf(conf);
try (var writer = builder.build()) {
writer.write(pb.toBuilder());
}
如果将配置值修改为true
,则会成功:
ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
通过查看其源代码,我们可以知道该函数用于在配置中设置parquet.proto.writeSpecsCompliant
的布尔值。
在ParquetProtoWriters.forType
的源代码中,它用构建器类ParquetProtoWriterBuilder
创建了一个工厂,该工厂在内部使用org.apache.parquet.proto.ProtoWriteSupport
。我猜你可以给它分配一个正确配置的ProtoWriteSupport
。
我还安装了这个Python工具:https://pypi.org/project/parquet-tools/来检查拼花文件。旧格式生成的列表字段如下:
...
############ Column(f2) ############
name: f2
path: f1.f2
...
和新的格式将像:
...
############ Column(element) ############
name: element
path: f1.f2.list.element
...
希望这篇文章能给你一些指导。
引用- https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/LogicalTypes.md nested-types
- https://github.com/apache/parquet-format/pull/51
- https://github.com/apache/parquet-mr/pull/463