重新启动Flink程序后,为什么会有许多过程中的文件



我使用flink消耗kafka,并以镶木格式将其保存到HDFS。现在,我发现我的目标目录中的许多inprogress文件,当我重新启动Flink程序时,该文件不会在Target Dir中关闭。

我的envs:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(60000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getConfig.registerTypeWithKryoSerializer(classOf[MyMessage],classOf[ProtobufSerializer])

//sinks
    val bucketAssigner = new DateTimeBucketAssigner[myCounter]("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"))
    val streamingFileSink = StreamingFileSink.
      forBulkFormat(path, ParquetAvroWriters.forSpecificRecord(classOf[myCounter]))
      .withBucketCheckInterval(60000)
      .withBucketAssigner(bucketAssigner).build
-rw-r--r--   3 Administrator hdfs       1629 2019-08-05 17:06 /user/data/2019-08-05/.part-2-0.inprogress.722265d7-1082-4c84-b70d-da2a08092f5d
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:07 /user/data/2019-08-05/.part-2-1.inprogress.ac0d8b56-b8f0-4893-9e55-5374b69f16cc
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:08 /user/data/2019-08-05/.part-2-2.inprogress.a427c2e2-d689-42b8-aa3d-77873c5654f2
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:09 /user/data/2019-08-05/.part-2-3.inprogress.b5c746e3-354d-4ab3-b1a4-8c6bd88ae430
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 16:59 /user/data/2019-08-05/.part-2-3.inprogress.e286d995-3fa7-4696-b51a-27378412a35c
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:00 /user/data/2019-08-05/.part-2-4.inprogress.bcde4f30-2f78-4f54-92ad-9bc54ac57c5c
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:10 /user/data/2019-08-05/.part-2-4.inprogress.dbce8a00-6514-43dc-8b31-36c5a8665d37
-rw-r--r--   3 Administrator hdfs          0 2019-08-05 17:10 /user/data/2019-08-05/.part-2-5.inprogress.34e53418-f5af-4279-87ef-6a27549d90fe
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:01 /user/data/2019-08-05/.part-2-5.inprogress.936cdb63-4fe2-41bf-b839-2861030c5516
-rw-r--r--   3 Administrator hdfs          0 2019-08-05 16:55 /user/data/2019-08-05/.part-2-6.inprogress.7a7099a6-9dcd-450b-af2c-8a676276ef0a
-rw-r--r--   3 Administrator hdfs          0 2019-08-05 17:01 /user/data/2019-08-05/.part-2-6.inprogress.b57f548f-45fc-497c-9807-ef18dba3d11d
-rw-r--r--   3 Administrator hdfs       1574 2019-08-05 16:56 /user/data/2019-08-05/part-2-0
-rw-r--r--   3 Administrator hdfs       1868 2019-08-05 16:57 /user/data/2019-08-05/part-2-1
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 16:58 /user/data/2019-08-05/part-2-2
-rw-r--r--   3 Administrator hdfs       1661 2019-08-05 16:53 /user/data/2019-08-05/part-2-3
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 16:54 /user/data/2019-08-05/part-2-4

我认为原因是,当我重新启动程序时,过程中的文件不关闭,我感到困惑的是,为什么这些文件在重新启动后不会关闭,即使新文件也会成为过程中。有人可以说明吗?

简称,恰好是语义。

PLS首先从Flink Offical Blog中阅读此帖子。

然后,让我尝试清楚地解释它。

  1. bucketingsink将所有记录写入temp文件,默认情况下用后缀在过程中。

  2. 当在此水槽上进行检查点的时间到来时,Flink会将fo的名称保存到程序内文件中;

  3. 当提交提交的时候,Flink会将过程中的文件重命名为最终名称,在您的示例中,它们是Part-X-X文件。

并且,当您重新启动Flink应用程序时,Flink作业将从最后一个保存点重新启动(如果您设置了参数(,并且许多尚未准备就绪提交的过程中的文件将被放弃,切勿被读取(从DOT开始用户将不会由HDFS列出(。

当然,我忽略了许多详细信息,例如,文件将在卷上超过配置时将文件重命名为.pendent。

您需要用flink shell提交应用程序,以使应用程序从保存点恢复,例如:
./bin/flink run -s <savepointPath> ...,检查此信息以获取更多详细信息。
StreamingFileSink将处理InProgress文件。

最新更新