我正在尝试使用flink流以消费kafka主题消息并创建(定期(将保存在S3上的parquet文件。
使用带有批量格式的流文件接收器来更改创建的部分文件名(或添加后缀/前缀(比Part-0-0或Part-1-3?
StreamingFileSink<> sink = StreamingFileSink.forBulkFormat(new Path("s3://test-bucket/"), ParquetAvroFactory.getParquetWriter(schema, CompressionCodec.UNCOMPRESSED.name()))
.withBucketAssigner(new PartitionBucketAssigner(partitionColumns))
.build();
您可以覆盖getBucketid方法(请参阅https://ci.apache.org/projects/flink/flink/flink/flink/flink-docs-master/api/java/java/java/java/org/opache/flink/flink/flink/streaming/streaming一下bucketAssigner上的/pi/functions/sink/filesystem/bucketassigner.html(,它将影响路径,但显然不是零件文件名(请参阅下面的注释(。
>零件文件名是在org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
中的这一点代码中建立的:
private Path assembleNewPartPath() {
return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
}
似乎不是被设计为自定义的。