我有一个关于Flink及其定时器服务的问题。
我有一个使用定时器的keyBy流,当计时器被调用时,我需要发送一个http请求,这可能需要一些时间来响应。我的问题是,我应该使http调用异步吗?还是flink已经将计时器调用作为一个具有async per key的新线程?
提前感谢
您可以使用ProcessFunction
来存储HTTP请求所需的数据,并且可以有一个计时器。当它触发时,您将发出一个包含请求数据的记录,随后的AsyncFunction
将使用该数据发出所需的定期请求。
如果您询问是否在单独的线程中为每个键调用onTimer
方法,那么我很确定它不是。因此,在这种情况下,您需要异步调用HTTP调用。
但老实说,一般来说,我认为使用onTimer
函数执行HTTP调用不是一个好主意。我对您的用例一无所知,但我认为您应该考虑使用不同的机制,比如创建侧输出,然后使用FlinkAsync
运算符。在onTimer
中使用异步调用可能很棘手,尤其是当您考虑到重试、超时和可能的失败时。
因此,根据评论,用例是每X分钟打一个服务电话,然后向Kafka发布一些内容。所以,你可以做的就是简单地拥有一个进程函数来调度定时器。每次触发计时器时,如果需要任何数据,您就会生成一些输出记录,其中包含请求所需的数据。然后使用Async
操作符实际执行请求,解析响应,并根据响应生成一些输出记录,然后将其保存到Kafka中。