Parquet writer: org.apache.parquet.io.ParquetEncodingExcepti



我正在使用Apache Parquet Hadoop - ParquetRecordWriter与MapReduce并点击ParquetEncodingException: writing empty page。尽管我发现,当 valueCount 为 0 时,这发生在 ColumnWriterBase 中,但我并没有取消此属性为 0 的真正原因,为什么它与 Endoding 有关系,这种状态如何发生?知道吗?感谢您的任何提示。

Error: org.apache.parquet.io.ParquetEncodingException: writing empty page
at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:309)
at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:152)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:27)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)

版本: org.apache.parquet:parquet-hadoop:1.11.0

我正在使用自己的 WriteSupport 类:

public class MyDataWriteSupport extends WriteSupport<MyData> {
private RecordConsumer recordConsumer;
public MyDataWriteSupport() {}
@Override
public WriteContext init(Configuration configuration) {
Map<String, String> metaData = new HashMap<>();
return new WriteContext(getSchema(), metaData);
}
public static MessageType getSchema() {
return MessageTypeParser.parseMessageType(
" message MyData { "
+ "optional binary key (UTF8);"
+ "optional int64 length;"
+ "repeated int32 datarray;"
+ "repeated group myobj {n"
+ "   optional int32 id;"
+ "   optional binary title (UTF8);"
+ "}"
+ " }");
}
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}
@Override
public void write(MyData record) {
recordConsumer.startMessage();
writeData(record);
recordConsumer.endMessage();
}
private void writeData(MyData record) {
recordConsumer.startMessage();
addStringValue(recordConsumer, 0, "key", record.getKey());
addLongValue(recordConsumer, 1, "length", record.getLength());
addIntegerArrayValues(recordConsumer, 2, "datarray", record.getDataArray());
if (!record.getMyObjects().isEmpty()) {
recordConsumer.startField("myobj", 3);
record
.getMyObject()
.forEach(
obj -> {
recordConsumer.startGroup();
addIntValue(recordConsumer, 0, "id", obj.id);
addStringValue(recordConsumer, 1, "title", obj.title);
recordConsumer.endGroup();
});
recordConsumer.endField("myobj", 3);
}
recordConsumer.endMessage();
}
private void addIntValue(RecordConsumer recordConsumer, int index, String fieldName, int value) {
recordConsumer.startField(fieldName, index);
recordConsumer.addInteger(value);
recordConsumer.endField(fieldName, index);
}
private static void addIntegerArrayValues(
RecordConsumer recordConsumer, int index, String fieldName, int[] is) {
if (is.length > 0) {
recordConsumer.startField(fieldName, index);
Arrays.stream(is).forEach(labelIndex -> recordConsumer.addInteger(labelIndex));
recordConsumer.endField(fieldName, index);
}
}
private static void addLongValue(
RecordConsumer recordConsumer, int index, String fieldName, long value) {
recordConsumer.startField(fieldName, index);
recordConsumer.addLong(value);
recordConsumer.endField(fieldName, index);
}
private static void addStringValue(
RecordConsumer recordConsumer, int index, String fieldName, String value) {
recordConsumer.startField(fieldName, index);
recordConsumer.addBinary(Binary.fromString(value));
recordConsumer.endField(fieldName, index);
}
}

我认为问题出在开始/结束调用上。一个问题是startMessage()endMessage()被调用两次,一次在write(MyData)中,一次在writeData(MyData)中。 我建议使用ValidatingRecordConsumer作为您使用的记录消费者的包装器。这样,如果记录序列化出现问题,您可能会获得更有意义的异常。

最新更新