我正在研究一个使用BucketingSink类写入hdfs文件的poc。即使数据正在写入 hdfs 文件,但这些文件在 hdfs 上以".pending"为由。
下面是我正在使用的代码。有人可以帮助我确定问题并帮助我解决此问题吗?
BucketingSink<String> HdfsSink = new BucketingSink<String>("hdfs://xxxx/xxxx/xxxx/Test/");
HdfsSink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
HdfsSink.setBatchSize(1024 * 1024 * 2); // this is 2 MB,
HdfsSink.setInactiveBucketCheckInterval(10000L);
HdfsSink.setInactiveBucketThreshold(10000L);
嗨,你可以使用它。
嗨,未完成的存储桶具有 .pending 扩展名。一旦存储桶关闭(例如,对于时间分桶,一旦时间结束(,该文件将被重命名。由于您使用的是NonRollingBucketer,因此文件将永远不会关闭。我建议您使用DateTimeBucketer。
作为旁注:我建议您稍微增加检查点间隔。 123 毫秒非常频繁,应用程序看起来不像是极其关键的延迟。像 2000 毫秒这样的值可能更合适。
我发现文件保持为.pending 的实际原因是......因为我还没有启用检查点。一旦我启用了检查点...文件已成功关闭,而不会显示为 .pending。
您可以通过设置env.enableCheckpointing(<duration>)
来启用检查点
请查看网址@https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html 了解更多详情。