>我正在尝试使用集合生成一些测试数据,并将该数据写入 s3,当我这样做时,Flink 似乎根本没有做任何检查点,但是当源来自 s3 时它确实做了检查点。
例如,此 DO 检查点并使输出文件保持已完成状态:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))
val lines: DataStream[String] = {
val path = "s3a://my_bucket/simple_job/in"
env
.readFile(
inputFormat = new TextInputFormat(new Path(path)),
filePath = path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = 5000L
)
}
val sinkFunction: BucketingSink[String] =
new BucketingSink[String]("s3a://my_bucket/simple_job/out")
.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
lines.addSink(sinkFunction)
env.execute()
同时,这不会检查点,即使在作业完成后也会使文件处于 .pending 状态:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))
val lines: DataStream[String] = env.fromCollection((1 to 100).map(_.toString))
val sinkFunction: BucketingSink[String] =
new BucketingSink[String]("s3a://my_bucket/simple_job/out")
.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
lines.addSink(sinkFunction)
env.execute()
事实证明,这是因为这张票:https://issues.apache.org/jira/browse/FLINK-2646,只是因为来自集合的流在应用程序有时间创建单个检查点之前完成。