我在Flink(Java)中有一个程序,它可以计算数据流中的不同单词。我使用计数单词的示例实现,并且我同时应用了另一个窗口来评估不同的值。该程序工作正常。但是,我担心我正在使用两个窗口来处理不同的计数。第一个窗口计算单词数,第二个窗口我切换要1
的单词数,并将单词切换为Tuple2
的第二个元素。他们 我数着钥匙的数量。这是我的程序的输入和输出:
// input:
aaa
aaa
bbb
ccc
bbb
aaa
output:
(3,bbb-ccc-aaa)
如果我删除第二个窗口,它会显示每个键的所有评估并保存前一个窗口的状态。
// input:
aaa
aaa
bbb
ccc
bbb
aaa
// output:
3> (1,bbb)
3> (2,bbb-aaa)
3> (3,bbb-aaa-ccc)
// wait the first window to be evaluated.
// input:
aaa
aaa
bbb
ccc
bbb
aaa
// output:
3> (4,bbb-aaa-ccc-ccc)
3> (5,bbb-aaa-ccc-ccc-bbb)
3> (6,bbb-aaa-ccc-ccc-bbb-aaa)
我的程序:
public class WordCountDistinctSocketFilterQEP {
public WordCountDistinctSocketFilterQEP() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// @formatter:off
env.socketTextStream("localhost", 9000)
.flatMap(new SplitterFlatMap())
.keyBy(new MyKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new CountReduceFunction())
.map(new SwapMapFunction())
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // TESTING REMOVING THIS WINDOW
.reduce(new CountDistinctFunction())
.print();
// @formatter:on
String executionPlan = env.getExecutionPlan();
System.out.println("ExecutionPlan ........................ ");
System.out.println(executionPlan);
System.out.println("........................ ");
// dataStream.print();
env.execute("WordCountDistinctSocketFilterQEP");
}
public static class SwapMapFunction implements MapFunction<Tuple2<String, Integer>, Tuple2<Integer, String>> {
private static final long serialVersionUID = 5148172163266330182L;
@Override
public Tuple2<Integer, String> map(Tuple2<String, Integer> value) throws Exception {
return Tuple2.of(1, value.f0);
}
}
public static class SplitterFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 3121588720675797629L;
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
public static class MyKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
private static final long serialVersionUID = 2787589690596587044L;
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}
public static class CountReduceFunction implements ReduceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 8541031982462158730L;
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)
throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}
public static class CountDistinctFunction implements ReduceFunction<Tuple2<Integer, String>> {
private static final long serialVersionUID = -7077952757215699563L;
@Override
public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2)
throws Exception {
return Tuple2.of(value1.f0 + value2.f0, value1.f1 + "-" + value2.f1);
}
}
}
ReduceFunctions
更好地与Collections
配合使用(Maps
,Lists
,Sets
)。如果你将每个单词映射到一个元素Set
,你可以编写一个对Set<String>
进行操作的ReduceFunction
,然后你可以用一个ReduceFunction
而不是两个来做到这一点。
所以splitterFlatMap
返回一系列一个元素长Set<String>
,MyKeySelector
返回每个集合的第一个元素。窗口函数很好,更改reduce函数以匹配Set<String>
类型,函数的内脏只是value1.addAll(value2)
。此时,您已经获得了输入中所有唯一单词的集合,分布在您正在运行的多个并行任务中。根据完成后将所有这些数据放在哪里,这可能就足够了。否则,您可以在其末尾放置一个全局窗口,并再次使用相同的reduce函数(下面的解释)
您的第二个问题是这不会按原样扩展。在某种程度上,这更像是一个哲学问题。如果不让每个并行实例与其他每个并行实例通信,就无法真正跨并行实例获取全局计数。不过,您可以做的是按实际单词对拆分的单词流进行键控,然后使用(并行)键控、窗口ReduceFunction
来获取每个键组中不同单词的列表。然后,您可以有另一个不并行的ReduceFunction
,它结合了并行结果。您还希望第二个窗口化;WindowFunctions
触发之前等待所有上游运算符到达正确的水印,因此窗口将确保您的非并行运算符从每个并行运算符接收输入。非并行运算符上的聚合是简单的串联,因为开头的键控保证给定的字将恰好存在于一个并行槽中。
显然,单个非并行运算符可能存在瓶颈,但是负载比例与不同单词的总数有关,实际上可能仅限于10k个单词左右,因为英语的工作方式。
我使用AggregateFunction提出了这个解决方案。
public class WordDistinctCountAggregateWindowSocket {
public static void main(String[] args) throws Exception {
new WordDistinctCountAggregateWindowSocket();
}
public WordDistinctCountAggregateWindowSocket() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// @formatter:off
env.socketTextStream("localhost", 9000)
.flatMap(new SplitterFlatMap())
.map(new SwapMap())
.keyBy(new WordKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new DistinctCountAggregateFunction())
.print();
// @formatter:on
String executionPlan = env.getExecutionPlan();
System.out.println("ExecutionPlan ........................ ");
System.out.println(executionPlan);
System.out.println("........................ ");
env.execute("WordDistinctCountAggregateWindowSocket");
}
public static class DistinctCountAggregateFunction
implements AggregateFunction<Tuple2<Integer, String>, DistinctCountWithTimestamp, Tuple2<String, Integer>> {
private static final long serialVersionUID = 996334987119123032L;
@Override
public DistinctCountWithTimestamp createAccumulator() {
System.out.println("createAccumulator");
return new DistinctCountWithTimestamp(new HashSet<String>(), 0, System.currentTimeMillis());
}
@Override
public DistinctCountWithTimestamp add(Tuple2<Integer, String> value, DistinctCountWithTimestamp accumulator) {
System.out.println("add");
accumulator.distinctWords.add(value.f1);
accumulator.distinctCount = accumulator.distinctWords.size();
return accumulator;
}
@Override
public Tuple2<String, Integer> getResult(DistinctCountWithTimestamp accumulator) {
System.out.println("getResult");
String items = "";
for (String item : accumulator.distinctWords) {
items = items + item + "-";
}
return Tuple2.of(items, accumulator.distinctCount);
}
@Override
public DistinctCountWithTimestamp merge(DistinctCountWithTimestamp a, DistinctCountWithTimestamp b) {
System.out.println("merge");
return null;
}
}
public static class SplitterFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 3121588720675797629L;
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
public static class SwapMap implements MapFunction<Tuple2<String, Integer>, Tuple2<Integer, String>> {
private static final long serialVersionUID = -1392476272305784921L;
@Override
public Tuple2<Integer, String> map(Tuple2<String, Integer> value) throws Exception {
return Tuple2.of(value.f1, value.f0);
}
}
public static class WordKeySelector implements KeySelector<Tuple2<Integer, String>, Integer> {
private static final long serialVersionUID = 2787589690596587044L;
@Override
public Integer getKey(Tuple2<Integer, String> value) throws Exception {
return value.f0;
}
}
public static class DistinctCountWithTimestamp {
public Set<String> distinctWords;
public Integer distinctCount;
public long lastModified;
public DistinctCountWithTimestamp() {
this.distinctWords = new HashSet<String>();
this.distinctCount = 0;
this.lastModified = 0L;
}
public DistinctCountWithTimestamp(Set<String> distinctWords, Integer distinctCount, long lastModified) {
this.distinctWords = distinctWords;
this.distinctCount = distinctCount;
this.lastModified = lastModified;
}
@Override
public String toString() {
return "DistinctCountWithTimestamp [distinctWords=" + distinctWords + ", distinctCount=" + distinctCount
+ ", lastModified=" + lastModified + "]";
}
}
}
我基于此示例实现了我的解决方案 (https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/windows/HourlyTipsSolution.java)
public class WordDistinctCountProcessTimeWindowSocket {
public static void main(String[] args) throws Exception {
new WordDistinctCountProcessTimeWindowSocket();
}
public WordDistinctCountProcessTimeWindowSocket() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Time time = Time.seconds(5);
// @formatter:off
env.socketTextStream("localhost", 9000)
.flatMap(new SplitterFlatMap())
.keyBy(new WordKeySelector())
.timeWindow(time)
.process(new DistinctProcessWindowFunction())
.timeWindowAll(time)
.reduce(new CountReduceFunction())
.print();
// @formatter:on
String executionPlan = env.getExecutionPlan();
System.out.println("ExecutionPlan ........................ ");
System.out.println(executionPlan);
System.out.println("........................ ");
env.execute("WordDistinctCountProcessTimeWindowSocket");
}
public static class SplitterFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 3121588720675797629L;
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
public static class WordKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
private static final long serialVersionUID = 2787589690596587044L;
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}
public static class DistinctProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
private static final long serialVersionUID = -712802393634597999L;
@Override
public void process(String key,
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>.Context ctx,
Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
Tuple2<String, Integer> value = values.iterator().next();
out.collect(Tuple2.of(value.f0, 1));
}
}
public static class CountReduceFunction implements ReduceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 8047191633772408164L;
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)
throws Exception {
return Tuple2.of(value1.f0 + "-" + value2.f0, value1.f1 + value2.f1);
}
}
}
在性能方面,最好的选择是使用HyperLogLog算法。
这里描述了它的 Java 实现。
并使用大型数据集的估计基数作为聚合函数中的累加器。