我打算在这里将从kafka收到的消息转换为拼花文件,但我可能错了。你能帮我做这个话题吗?
private static SinkFunction<String> createFileSink(String outputPath) {
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024)
.build())
.build();
return sink;
}
您应该使用批量编码格式来编写Parquet。RowFormat用于编写文本、csv、json等。