Flink 不会设置检查点,当从 Collection 生成源时,BucketingSink 会将文件保留为挂起状态



>我正在尝试使用集合生成一些测试数据,并将该数据写入 s3,当我这样做时,Flink 似乎根本没有做任何检查点,但是当源来自 s3 时它确实做了检查点。

例如,此 DO 检查点并使输出文件保持已完成状态:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))
val lines: DataStream[String] = {
  val path = "s3a://my_bucket/simple_job/in"
  env
    .readFile(
      inputFormat = new TextInputFormat(new Path(path)),
      filePath = path,
      watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
      interval = 5000L
    )
}
val sinkFunction: BucketingSink[String] =
  new BucketingSink[String]("s3a://my_bucket/simple_job/out")
    .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
lines.addSink(sinkFunction)
env.execute()

同时,这不会检查点,即使在作业完成后也会使文件处于 .pending 状态:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))
val lines: DataStream[String] = env.fromCollection((1 to 100).map(_.toString))
val sinkFunction: BucketingSink[String] =
  new BucketingSink[String]("s3a://my_bucket/simple_job/out")
    .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
lines.addSink(sinkFunction)
env.execute()

事实证明,这是因为这张票:https://issues.apache.org/jira/browse/FLINK-2646,只是因为来自集合的流在应用程序有时间创建单个检查点之前完成。

相关内容

  • 没有找到相关文章

最新更新