为了提高数据处理的性能,我们将事件存储到映射中,直到事件计数达到100时才进行处理。同时,以开放方式启动计时器,因此数据每60秒处理一次
当flink版本1.11.3,
在将flink版本升级到1.13.0之后
我发现有时事件是从Kafka中连续消耗的,但没有在RichFlatMapFunction中处理,这意味着数据丢失。重新启动服务后,它运行良好,但几个小时后,同样的事情再次发生。
这个flink版本有什么已知的问题吗?欢迎提出任何建议。
public class MyJob {
public static void main(String[] args) throws Exception {
...
DataStream<String> rawEventSource = env.addSource(flinkKafkaConsumer);
...
}
public class MyMapFunction extends RichFlatMapFunction<String, String> implements Serializable {
@Override
public void open(Configuration parameters) {
...
long periodTimeout = 60;
pool.scheduleAtFixedRate(() -> {
// processing data
}, periodTimeout, periodTimeout, TimeUnit.SECONDS);
}
@Override
public void flatMap(String message, Collector<String> out) {
// store event to map
// count event,
// when count = 100, start data processing
}
}
您应该避免在Flink函数中使用用户线程和计时器。支持的机制是将KeyedProcessFunction与处理时间计时器一起使用。