使用ParquetRecordSetWriter时,NiFi合并记录处理器不符合最小存储箱大小



我正试图在NiFi(1.11.4(中构建一个流,该流从AMQ读取Avro消息,使用Merge Records处理器累积这些消息,然后将合并的镶木地板文件写入HDFS。

问题是,当我试图在MergeRecord处理器中使用ParquetRecordSetWriter时(与AvroReader结合使用(-合并后的内容永远不会根据设置的最小垃圾箱大小阈值发出-我试图设置非常低的值-它就是不起作用。同时,最大垃圾箱年龄阈值工作得很好。

此外,如果我使用AvroRecordSetWriter-最小大小阈值也可以。因此,我尝试使用AvroRecordSetWriter,然后使用PutParquet(或ConvertAvroToParquet(,并面临另一个问题:如果我为镶木地板文件设置了行组大小(例如128 MB(,那么就永远不会写入小文件。

看起来它可以缓冲内存中的内容,但它真的应该这样做吗?因为在我为测试AvroParquetWriter(本质上与NiFi使用的程序相同(而编写的简单Java程序中,我能够编写具有巨大行组大小集的小文件。

在NiFi中是否存在与拼花地板书写相关的已知问题?我对描述的行为感到非常困惑。感谢任何协助。

提前谢谢。

我在1.12.0上运行类似的流,但我在记录计数和最大仓龄而不是最小仓大小上发射。效果很好。这里可能有一件事对你起作用,那就是我注意到Parquet的输出大约是Avro二进制文件大小的50%,在我们的数据集中非常一致。所以,如果你根据Avro来猜测bin内存大小,那么Parquet可能会出错。

最新更新