如何在 Flink 流中在进程级别缓存局部变量?



在 Flink 任务实例中我需要访问远程 Web 服务才能在事件到来时获取一些数据,但是我不想每次事件来临时都访问远程 Web 服务,所以我需要将数据缓存在本地内存中并且可以被进程的所有任务访问,怎么办? 将数据存储在类级别的静态私有变量中?

比如下面的例子,如果把局部变量localCache设置在类Splitter,它缓存在算子级别而不是进程级别。

public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
***private object localCache ;***
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

就像你说的。您将在RichFlatMapFunction中使用静态变量并在open中对其进行初始化。 在输入任何记录之前,将在每个任务管理器上调用open。请注意,为每个不同的插槽创建了一个拆分器实例,因此在大多数情况下,一个任务管理器上有多个拆分器实例。因此,您需要防止双重创建。

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private transient Object localCache;

@Override
public void open(Configuration parameters) throws Exception {
if (localCache == null)
localCache = ... ;
}
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}

可缩放的方法可以使用 Source 运算符实际执行对 Web 服务的调用,然后将结果写入流。 然后,您可以将该流作为广播流访问到您的运营商,从而将发送到广播流的一个对象(Web 调用结果(发送到接收运营商的每个实例。这将在集群中的所有计算机和 JVM 之间共享该单个 Web 调用的结果。 您还可以保留广播状态,并在集群纵向扩展时与操作员的新实例共享该状态。

相关内容

  • 没有找到相关文章

最新更新