如何使用 Flink 计数窗口



当我在窗口函数之前使用计数窗口时,运行代码后我得到错误,如State migration is currently not supported,详细信息 msg 是

org.apache.flink.util.FlinkRuntimeException: org.apache.flink.util.StateMigrationException: State migration is currently not supported.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.create(HeapKeyedStateBackend.java:216)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:741)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:225)
at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.open(EvictingWindowOperator.java:430)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: State migration is currently not supported.
at org.apache.flink.util.StateMigrationException.notSupported(StateMigrationException.java:42)
... 11 more

我的生成器代码是

SingleOutputStreamOperator singleOutputStream = iotDataSource.flatMap(new String2MetricFlatMapFunctionWithOutTimeAlignment())
.filter(new WaterMarkFilterFunction(Time.seconds(WATERMARK_SIZE)))
.filter(new DataCleanFilterFunction())
.map(new RelateParkMapFunction())
.assignTimestampsAndWatermarks(new CommonBoundedOutOfOrdernessTimestampExtractor(Time.seconds(MAX_OUT_OF_ORDERNESS)))
.keyBy((KeySelector<Metric, String>) metric -> metric.getPark())
.countWindow(10,1)
.apply(new RealTimeAlarmWindowApplyFunction(CACHE_SIZE))
.map(new ResultMapFunction())
.disableChaining();

也许RealTimeAlarmWindowApplyFunction出了点问题,

public class RealTimeAlarmWindowApplyFunction extends RichWindowFunction<Metric, ResultAction, String, GlobalWindow> {
private Logger logger = LoggerFactory.getLogger(RealTimeAlarmWindowApplyFunction.class);
private int cacheDataSize;
public RealTimeAlarmWindowApplyFunction(int cacheDataSize) {
this.cacheDataSize = cacheDataSize;
}
@Override
public void apply(String key, GlobalWindow window, Iterable<Metric> input, Collector<ResultAction> out) throws Exception {
//...
// process code with no error 
//...
}
}

现在,在运行 flink 作业时,此问题已通过倾斜的 savepoit 选项修复:

$FLINK_HOME/bin/flink run -ynm jobname -yjm 1024 -ytm 4096 -yn 1 -ys 4 -qu root.users.root -flink_job_jar /root/flink_jar/test/flinkJobTest.jar -args 16

PS:我使用这样的保存选项一直运行此作业

$FLINK_HOME/bin/flink run -ynm jobname -yjm 1024 -ytm 4096 -yn 1 -ys 4 -qu root.users.root -flink_job_jar /root/flink_jar/test/flinkJobTest.jar -savepoint_prefix hdfs://nameservice1/flink-checkpoint -args 16

但是在我更改窗口类型后,不允许保存点恢复

相关内容

  • 没有找到相关文章

最新更新