我是Flink的新手,有一个关于从KeyedProcessedFunction的processElement
函数进行外部API调用的问题。
在我们当前的设置中,我们正在对上游服务进行同步API调用,同时根据收到的响应相应地更新KeyedProcessFunction
的状态。这是目前可行的,但我想知道这是否是最好的方法。
据我所知,Flink中处理外部数据访问的推荐方法是使用异步IO API。然而,考虑到RichAsyncFunction
是无状态的,我不太确定我如何能够在没有某种重构的情况下将其适应我们的用例。
做这件事最好的方法是什么?或者KeyedProcessFunction
一开始就不适合这个目的?谢谢你。
示例代码来说明我的观点:
public class StoppingLight extends KeyProcessedFunction<String, String> {
private ValueState<String> previousState;
.....
@Override
public void processElement(String newState, Context ctx, Collector<String> out) {
// getNextState() is a synchrnous HTTPS API call to some external endpoint
String nextState = getNextState(newState, previousState.value());
if(nextState == "Red") {
this.previousState = "Yellow";
} else if(nextState == "Green") {
this.previousState= "Red";
}
}
}
我考虑将http请求从KeyedProcessFunction中取出,并以某种方式将其链接到async-io操作符,但这可能不起作用,因为业务逻辑需要从API调用中获得返回值才能工作。
不建议从任何Flink用户函数(如KeyedProcessFunction)执行同步API调用,因为这会干扰检查点。当用户函数被阻塞等待同步I/O时,所涉及的操作员不能做任何事情。即使它在大多数情况下工作正常,这也有导致检查点失败的风险。更不用说低吞吐量和资源利用率了。
然而,我不确定推荐什么作为替代方案。(不确定为什么异步I/O不适合您的用例)