为什么在我的flink程序中多次执行sink操作?



我有一个flink程序,源代码来自kafka,我打开了三个窗口流:秒,分,小时。然后通过AsyncHttpSink将窗口结果发送给其他人扩展RichSinkFunction。但是我发现相同的窗口,一个kafka消息,相同的结果可能调用AsyncHttpSink.invoke()函数多次,这引起了我的好奇心。它不应该在同一个窗口中只发生一次,一个kafka消息,相同的结果吗?

hourOperator.addSink(httpSink(WindowType.h));
minuteOperator.addSink(httpSink(WindowType.m));
secondOperator.addSink(httpSink(WindowType.s));

/**
* http sink
*/
public class AsyncHttpSink extends RichSinkFunction<Tuple3<String, Long, Map<String, Tuple2<XXX, Object>>>> {

public AsyncHttpSink(WindowType windowType) {
this.windowType = windowType;
}
@Override
public void open(Configuration parameters) throws Exception {
httpClient = HttpAsyncClients.custom()
.build();
httpClient.start();
}
@Override
public void close() throws Exception {
httpClient.close();
}

@Override
public void invoke(Tuple3<String, Long, Map<String, Tuple2<XXX, Object>>> tuple3, Context context) throws Exception {

httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
try {
logger.info("[httpSink]http sink completed.");
} catch (IOException e) {
logger.error("[httpSink]http sink completed. exception:", e);
}
}
@Override
public void failed(Exception ex) {
logger.error("[httpSink]http sink failed.", ex);
}
@Override
public void cancelled() {
logger.info("[httpSink]http sink cancelled.");
}
});
}
}

如果这是一个键窗口,那么对于给定窗口,每个具有结果的不同键将分别报告其结果。

并且您可能有几个并行的接收器实例。

相关内容

  • 没有找到相关文章

最新更新