使用Kafka Source时终止Flink作业



当我的生产者完成了将所有消息流式传输到Kafka,并且在Flink完成处理之后,我希望能够终止Flink作业,这样它就不会继续运行,并且我可以知道Flink何时完成了对所有数据的处理。我也不能使用批处理,因为我需要Flink与我的Kafka流并行运行。

通常,Flink在DeserializationSchema类中使用isEndOfStream方法来查看它是否应该提前结束(在方法中返回true将自动结束作业(。然而,当使用Kafka作为Flink的源时,新的KafkaSource类已经否决了在反序列化器中使用isEndOfStream方法,并且不再检查它是否应该结束流。有没有其他方法可以提前终止Flink的工作?

KafkaSource提供的在有界流上操作的机制是将setBoundedsetUnbounded与生成器一起使用,如

KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers(...)
.setGroupId(...)
.setTopics(...)
.setDeserializer(...) // or setValueOnlyDeserializer
.setStartingOffsets(...)
.setBounded(...) // or setUnbounded
.build();

setBounded指示一旦源已经消耗了通过指定偏移的所有数据,就应该停止该源。

setUnbounded可以用来指示,虽然源不应该读取超过指定偏移量的任何数据,但它应该保持运行。这允许源在STREAMING模式下运行时参与检查点。

如果你事先知道你想读多少书,这很好。我使用了带有特定时间戳的setBounded,例如

.setBounded(
OffsetsInitializer.timestamp(
Instant.parse("2021-10-31T23:59:59.999Z").toEpochMilli()))

也像这个

.setBounded(OffsetsInitializer.latest())

最新更新