我已经用翻滚窗口和QueryableState实现了Total WordCount示例。
我已经使用了10 秒的时间窗口,当我打印结果时,它会显示正确的结果,但是当我使用可查询状态并使用 QueryableClient 进行查询时,即使时间窗口发生变化,它也会缓存时间窗口的最后一个结果。
例如,对于时间窗口 11:00:01 到 11:00:10,"Nirav"的字数为 5
当我在11:00:50时间查询"Nirav"时,它会返回之前的计数 5。
所以我有两个问题:
- 这是 Flink 的 QueryableStateClient 的默认行为吗,它会缓存同一键的最后一个输出,直到该键的新状态?
- 时间窗口完成后,如何清除之前的结果?
可查询实现如下
int sec = 10;
Time seconds = Time.seconds(sec);
text.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(seconds)
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
System.out.println("After time window fun:- a.word:" + a.word + ", a.count:" + a.count + ", b.word:" + b.word + ", b.count:" + b.count);
return new WordWithCount(a.word, a.count + b.count);
}
})
.keyBy(wordWithCount -> wordWithCount.word)
.asQueryableState("wordCountQuery", valueStateDescriptor)
整个实施
SocketWindowWordCountWithQueryableStateWithTimeWindow.javapackage com.nirav.modi;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCountWithQueryableStateWithTimeWindow {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port);
ReduceFunction<WordWithCount> reduceFunction = new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
System.out.println("reduce fun:- a.word:" + a.word + ", a.count:" + a.count + ", b.word:" + b.word + ", b.count:" + b.count);
return new WordWithCount(a.word, a.count + b.count);
}
};
// ReducingStateDescriptor<WordWithCount> descriptor = new ReducingStateDescriptor<WordWithCount>("wordCountQuery", reduceFunction, WordWithCount.class);
ValueStateDescriptor<WordWithCount> valueStateDescriptor = new ValueStateDescriptor<WordWithCount>("wordCountQuery", WordWithCount.class);
int sec = 10;
Time seconds = Time.seconds(sec);
text.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(seconds)
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
System.out.println("After time window fun:- a.word:" + a.word + ", a.count:" + a.count + ", b.word:" + b.word + ", b.count:" + b.count);
return new WordWithCount(a.word, a.count + b.count);
}
}).keyBy(wordWithCount -> wordWithCount.word)
.asQueryableState("wordCountQuery", valueStateDescriptor);
env.getConfig().enableSysoutLogging();
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
System.out.println("[info] Window WordCount with Time Window Job ID: " + jobGraph.getJobID());
System.out.println();
env.execute("Socket Window WordCount with Time Window of " + sec + " seconds");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
QueryStateWithWindowTest.java
package com.nirav.modi;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import scala.tools.jline_embedded.console.ConsoleReader;
import java.io.PrintWriter;
import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
public class QueryStateWithWindowTest {
public static void main(String[] args) throws Exception {
// the jobId to connect to
final String jobId;
final String queryableStateName;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
jobId = params.get("jobId");
queryableStateName = params.get("queryableStateName");
} catch (Exception e) {
System.err.println("No jobId specified. Please run 'SocketWindowWordCount --jobId <jobId>'");
return;
}
try {
ValueStateDescriptor<SocketWindowWordCountWithQueryableStateWithTimeWindow.WordWithCount> valueStateDescriptor = new ValueStateDescriptor<SocketWindowWordCountWithQueryableStateWithTimeWindow.WordWithCount>("wordCountQuery", SocketWindowWordCountWithQueryableStateWithTimeWindow.WordWithCount.class);
QueryableStateClient client = new QueryableStateClient("truecomtelesoft", 9069);
ExecutionConfig config = new ExecutionConfig();
client.setExecutionConfig(config.enableClosureCleaner());
ConsoleReader reader = new ConsoleReader();
reader.setPrompt("$ ");
PrintWriter out = new PrintWriter(reader.getOutput());
String line;
while ((line = reader.readLine()) != null) {
String key = line.toLowerCase().trim();
out.printf("[info] Querying key '%s'n", key);
try {
long start = System.currentTimeMillis();
CompletableFuture<ValueState<SocketWindowWordCountWithQueryableStateWithTimeWindow.WordWithCount>> kvState = client.getKvState(JobID.fromHexString(jobId), queryableStateName, key, BasicTypeInfo.STRING_TYPE_INFO, valueStateDescriptor);
try {
SocketWindowWordCountWithQueryableStateWithTimeWindow.WordWithCount wordWithCount = kvState.get().value();
long end = System.currentTimeMillis();
long duration = Math.max(0, end - start);
out.printf("%d (query took %d ms)n", wordWithCount.count, duration);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
out.println("Query failed because of the following Exception:");
e.printStackTrace(out);
}
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
"asQueryableState
创建的状态何时到期?"的简洁答案是永远不会。
asQueryableState
被转换为运算符,该运算符使用传入记录通过ValueState.update(value)
更新可查询的状态实例。这些值永不过期,但会在给定键的新记录到达时被覆盖。在测试应用程序中,这意味着查询将返回给定单词的最新非零计数。
显然,这不是你想要完成的。您可以使用ProcessFunction
使过时的条目过期。为此,可以显式创建自己的键托管状态,并在每次计数中存储最近更新条目的窗口的时间戳。然后,您将使用计时器来清除较旧的条目。
请参阅此 ProcessFunction 示例。若要使状态过期(此示例不这样做),请调用state.clear()
。