如何使用KafkaSource在java flink上提交偏移量



我想处理来自kafka的消息,然后提交该消息,一旦flink消费并处理所有消息,结束作业,使用taskmanager和heartbeat升级进程

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(address)
.setTopics(inputTopic)
.setGroupId(consumerGroup)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("enable.auto.commit", "true")
.setProperty("commit.offsets.on.checkpoint", "true")
.build();
DataStream<String> stream = environment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
ObjectMapper mapper = new ObjectMapper();
stream.map((value) -> {

如果您希望作业停止,那么您应该将其设置为批处理作业而不是流作业。这里有更多的信息:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/.

至于将记录提交给kafka代理,它是在每个成功的检查点/保存点上自动完成的,所以你不必在这方面做任何事情。

最新更新