Flink Kafka:在未收到消息的时间间隔后,优雅地关闭来自 kafka 源的 flink 使用消息



我已经将flinkkafkaconsumer作为源代码添加到我的streamexecution环境中。我想在特定时间(类似于 kafka polltime(没有收到新消息时关闭/停止 flink 消耗数据。 目前它无限期运行并阻止执行移动到下一步(验证消息(。 请建议是否有任何解决方法。

注意:我尝试使用反序列化的结束流,但它不起作用,因为流实际上是无限期的。

提前谢谢。

如果这是为了测试,那么一种方法是创建您自己的自定义源来"包装"FlinkKafkaConsumer。源的run()方法将从线程调用 Kafka 源的run()方法,传入包装真实收集器的收集器,并在收集任何内容时更新"上次收集时间"。在源的run()方法中,您将对此进行轮询,并在经过太多时间时调用 Kakfa 源的cancel()方法,然后退出。

说了这么多,通常对于单元测试,你会想要使用一个模拟的源代码,让你准确地控制正在生成什么,以及何时生成,而不是启动一个Kafka系统。

相关内容

  • 没有找到相关文章

最新更新