假设我们有多个数据流,它们共享一些共同的特征。
例如,我们有一个教师流和一个学生流,它们都有一个年龄字段。如果我想从实时流中找出年龄最大的学生或老师,我可以实现如下运算符。
public MaxiumAgeFunc extends RichMapFunction<Student,Integer> {
int maxAge;
@Override
public void flatMap(Student s, Collector<Integer> collector) throws Exception {
if(s.age > maxAge){
maxAge = s.age;
}
collector.collect(maxAge);
}
}
要找出最年长的老师,我们需要实现一个类似的运算符,
如下所示public MaxiumAgeFunc extends RichMapFunction<Teacher,Integer> {
int maxAge;
@Override
public void flatMap(Teacher t, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = t.age;
}
collector.collect(maxAge);
}
}
但实际上这两个运算符具有共同的过程逻辑,所以我的想法是定义一个父类,例如People。
public class People{
public Integer age;
}
然后,可以将学生和教师定义为他们的子类,并保留自己的字段。
public class Student extends People {
public Integer grade; // student grade
...
}
public class Student extends People {
public Integer subject; // the subject that teacher teaches
...
}
在这种情况下,我可以定义一个运算符,如下所示。
public MaxiumAgeFunc extends RichMapFunction<People,Integer> {
int maxAge;
@Override
public void flatMap(People p, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = p.age;
}
collector.collect(maxAge);
}
}
但是当我尝试使用此运算符来实现 Flink 执行拓扑时,由于数据类型不匹配,它不起作用。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Student> studentStream = env.addSource(...);
DataStream<Teacher> teacherStream = env.addSource(...);
studentStream.map(new MaxiumAgeFunc()).print();
teacherStream.map(new MaxiumAgeFunc()).print();
这是我的问题,是否可以为具有公共字段的输入流制作抽象运算符?
这与其说是 Flink 问题,不如说是一个 Java:
您要做的是像这样使MaxiumAgeFunc
参数化
public MaxiumAgeFunc<T extends People> extends RichMapFunction<T, Integer> {
int maxAge;
@Override
public void flatMap(T p, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = p.age;
}
collector.collect(maxAge);
}
}
然后像这样使用它
studentStream.map(new MaxiumAgeFunc<>()).print();
teacherStream.map(new MaxiumAgeFunc<>()).print();
编辑:
顺便说一句,您的函数无法使用检查点(因此从检查点恢复时会产生错误的结果(,我宁愿在全局窗口中使用聚合函数。
students
.windowAll(GlobalWindows.create())
.aggregate(new AggregateFunction<People, Integer, Integer>() {
@Override
public Integer createAccumulator() {
return -1;
}
@Override
public Integer add(People value, Integer accumulator) {
return Math.max(value.age, accumulator);
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return Math.max(a, b);
}
});