我想使用Kafka源作为Apache Flink 1.12的有界数据源,我尝试使用FlinkKafkaConsumer连接器,但它给了我以下原因
由:java.lang.IllegalStateException:检测到' execute .runtime-mode'设置为'BATCH'的UNBOUNDED源。这种组合是不允许的,请将'execution.runtime-mode'设置为STREAMING或AUTOMATIC[flink-core-1.12.0.jar:1.12.0]
根据flink最新的文档,我们可以使用Kafka作为一个有界的源,但是没有提供一个例子来说明它是如何可能的,也没有任何地方提到它是最好的方法。
有人可以帮助我得到一些示例工作代码来实现这个用例
下面是一个例子:
KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers(...)
.setGroupId(...)
.setTopics(...)
.setDeserializer(...)
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"));
更多信息请参见javadocs。