Apache Flink Process Function 状态未保持状态



我正在为Apache Flink 1.4中的processElement函数编写一些代码:

public class ProcessFunctionClass extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>{
private ListState<String> listState;
public void processElement(Tuple2<String, String> tuple2,  Context context, Collector<Tuple2<String, String>> collector) {
// if the state is empty, start a timer
if (listState.get().iterator().hasNext() == false)
context.timerService().registerEventTimeTimer(10000);
listState.add("someStringToBeStored");
// ...
}
}

当计时器到期时,我有这个功能:

public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
Iterable<String> strings = listState.get();
int cnt = 0;
int totalLength = 0;
Iterator<String> it = strings.iterator();
while (it.hasNext()) {
cnt++;
totalLength += it.next().length();
}
LOGGER.info("cnt is:" + cnt);
LOGGER.info("totalLength is:" + totalLength);
// clearing the state
listState.clear();
}

但是,每次运行应用程序时,cnt的值始终为 1,totalLength的值是当时已处理的特定字符串的长度。看起来状态没有保留在系统中。从这段代码中可以清楚地看出我在这里做错了什么?

进程函数使用键分区状态,这意味着每个键都有一个单独的列表。我的猜测是,在 10 秒的时间段内没有包含多个事件的键。

你的ProcessFunctionClass需要扩展 FlinkProcessFunction

ProcessFunction 不是用来保存状态的。您可以使用 WindowProcessFunctions 来保存窗口中存在的元素的状态。

相关内容

  • 没有找到相关文章

最新更新