Flink Streaming - 使用StreamingFilesInk时更改部分文件名



我正在尝试使用flink流以消费kafka主题消息并创建(定期(将保存在S3上的parquet文件。
使用带有批量格式的流文件接收器来更改创建的部分文件名(或添加后缀/前缀(比Part-0-0或Part-1-3?

StreamingFileSink<> sink = StreamingFileSink.forBulkFormat(new Path("s3://test-bucket/"),               ParquetAvroFactory.getParquetWriter(schema,  CompressionCodec.UNCOMPRESSED.name()))
.withBucketAssigner(new PartitionBucketAssigner(partitionColumns))
.build();

您可以覆盖getBucketid方法(请参阅https://ci.apache.org/projects/flink/flink/flink/flink/flink-docs-master/api/java/java/java/java/org/opache/flink/flink/flink/streaming/streaming一下bucketAssigner上的/pi/functions/sink/filesystem/bucketassigner.html(,它将影响路径,但显然不是零件文件名(请参阅下面的注释(。

>

零件文件名是在org.apache.flink.streaming.api.functions.sink.filesystem.Bucket中的这一点代码中建立的:

private Path assembleNewPartPath() {
    return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
}

似乎不是被设计为自定义的。

相关内容

  • 没有找到相关文章

最新更新