如何在Apache Flink 1.12中使用DataStream API Batch模式添加Kafka作为有界源.&l



我想使用Kafka源作为Apache Flink 1.12的有界数据源,我尝试使用FlinkKafkaConsumer连接器,但它给了我以下原因

由:java.lang.IllegalStateException:检测到' execute .runtime-mode'设置为'BATCH'的UNBOUNDED源。这种组合是不允许的,请将'execution.runtime-mode'设置为STREAMING或AUTOMATIC[flink-core-1.12.0.jar:1.12.0]

根据flink最新的文档,我们可以使用Kafka作为一个有界的源,但是没有提供一个例子来说明它是如何可能的,也没有任何地方提到它是最好的方法。

有人可以帮助我得到一些示例工作代码来实现这个用例

下面是一个例子:

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers(...)
        .setGroupId(...)
        .setTopics(...)
        .setDeserializer(...)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setBounded(OffsetsInitializer.latest())
        .build();
env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"));

更多信息请参见javadocs。

相关内容

  • 没有找到相关文章

最新更新