我已经将flinkkafkaconsumer作为源代码添加到我的streamexecution环境中。我想在特定时间(类似于 kafka polltime(没有收到新消息时关闭/停止 flink 消耗数据。 目前它无限期运行并阻止执行移动到下一步(验证消息(。 请建议是否有任何解决方法。
注意:我尝试使用反序列化的结束流,但它不起作用,因为流实际上是无限期的。
提前谢谢。
如果这是为了测试,那么一种方法是创建您自己的自定义源来"包装"FlinkKafkaConsumer
。源的run()
方法将从线程调用 Kafka 源的run()
方法,传入包装真实收集器的收集器,并在收集任何内容时更新"上次收集时间"。在源的run()
方法中,您将对此进行轮询,并在经过太多时间时调用 Kakfa 源的cancel()
方法,然后退出。
说了这么多,通常对于单元测试,你会想要使用一个模拟的源代码,让你准确地控制正在生成什么,以及何时生成,而不是启动一个Kafka系统。